datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43    file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46    datasource::{
47        file_format::{
48            csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49        },
50        listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51        view::ViewTable,
52        TableProvider,
53    },
54    datasource::{provider_as_source, source_as_provider},
55    prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59    context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60    DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63    dml,
64    logical_plan::{
65        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68        SubqueryAlias, TableScan, Values, Window,
69    },
70    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71    Statement, WindowUDF,
72};
73use datafusion_expr::{
74    AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
75    TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88    fn try_decode(buf: &[u8]) -> Result<Self>
89    where
90        Self: Sized;
91
92    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93    where
94        B: BufMut,
95        Self: Sized;
96
97    fn try_into_logical_plan(
98        &self,
99        ctx: &SessionContext,
100        extension_codec: &dyn LogicalExtensionCodec,
101    ) -> Result<LogicalPlan>;
102
103    fn try_from_logical_plan(
104        plan: &LogicalPlan,
105        extension_codec: &dyn LogicalExtensionCodec,
106    ) -> Result<Self>
107    where
108        Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112    fn try_decode(
113        &self,
114        buf: &[u8],
115        inputs: &[LogicalPlan],
116        ctx: &SessionContext,
117    ) -> Result<Extension>;
118
119    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121    fn try_decode_table_provider(
122        &self,
123        buf: &[u8],
124        table_ref: &TableReference,
125        schema: SchemaRef,
126        ctx: &SessionContext,
127    ) -> Result<Arc<dyn TableProvider>>;
128
129    fn try_encode_table_provider(
130        &self,
131        table_ref: &TableReference,
132        node: Arc<dyn TableProvider>,
133        buf: &mut Vec<u8>,
134    ) -> Result<()>;
135
136    fn try_decode_file_format(
137        &self,
138        _buf: &[u8],
139        _ctx: &SessionContext,
140    ) -> Result<Arc<dyn FileFormatFactory>> {
141        not_impl_err!("LogicalExtensionCodec is not provided for file format")
142    }
143
144    fn try_encode_file_format(
145        &self,
146        _buf: &mut Vec<u8>,
147        _node: Arc<dyn FileFormatFactory>,
148    ) -> Result<()> {
149        Ok(())
150    }
151
152    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154    }
155
156    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157        Ok(())
158    }
159
160    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161        not_impl_err!(
162            "LogicalExtensionCodec is not provided for aggregate function {name}"
163        )
164    }
165
166    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167        Ok(())
168    }
169
170    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172    }
173
174    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175        Ok(())
176    }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183    fn try_decode(
184        &self,
185        _buf: &[u8],
186        _inputs: &[LogicalPlan],
187        _ctx: &SessionContext,
188    ) -> Result<Extension> {
189        not_impl_err!("LogicalExtensionCodec is not provided")
190    }
191
192    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193        not_impl_err!("LogicalExtensionCodec is not provided")
194    }
195
196    fn try_decode_table_provider(
197        &self,
198        _buf: &[u8],
199        _table_ref: &TableReference,
200        _schema: SchemaRef,
201        _ctx: &SessionContext,
202    ) -> Result<Arc<dyn TableProvider>> {
203        not_impl_err!("LogicalExtensionCodec is not provided")
204    }
205
206    fn try_encode_table_provider(
207        &self,
208        _table_ref: &TableReference,
209        _node: Arc<dyn TableProvider>,
210        _buf: &mut Vec<u8>,
211    ) -> Result<()> {
212        not_impl_err!("LogicalExtensionCodec is not provided")
213    }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219        if let Some(field) = $PB.as_ref() {
220            field.as_ref().try_into_logical_plan($CTX, $CODEC)
221        } else {
222            Err(proto_error("Missing required field in protobuf"))
223        }
224    }};
225}
226
227fn from_table_reference(
228    table_ref: Option<&protobuf::TableReference>,
229    error_context: &str,
230) -> Result<TableReference> {
231    let table_ref = table_ref.ok_or_else(|| {
232        DataFusionError::Internal(format!(
233            "Protobuf deserialization error, {error_context} was missing required field name."
234        ))
235    })?;
236
237    Ok(table_ref.clone().try_into()?)
238}
239
240/// Converts [LogicalPlan::TableScan] to [TableSource]
241/// method to be used to deserialize nodes
242/// serialized by [from_table_source]
243fn to_table_source(
244    node: &Option<Box<LogicalPlanNode>>,
245    ctx: &SessionContext,
246    extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248    if let Some(node) = node {
249        match node.try_into_logical_plan(ctx, extension_codec)? {
250            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251            _ => plan_err!("expected TableScan node"),
252        }
253    } else {
254        plan_err!("LogicalPlanNode should be provided")
255    }
256}
257
258/// converts [TableSource] to [LogicalPlan::TableScan]
259/// using [LogicalPlan::TableScan] was the best approach to
260/// serialize [TableSource] to [LogicalPlan::TableScan]
261fn from_table_source(
262    table_name: TableReference,
263    target: Arc<dyn TableSource>,
264    extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266    let projected_schema = target.schema().to_dfschema_ref()?;
267    let r = LogicalPlan::TableScan(TableScan {
268        table_name,
269        source: target,
270        projection: None,
271        projected_schema,
272        filters: vec![],
273        fetch: None,
274    });
275
276    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280    fn try_decode(buf: &[u8]) -> Result<Self>
281    where
282        Self: Sized,
283    {
284        LogicalPlanNode::decode(buf).map_err(|e| {
285            DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286        })
287    }
288
289    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290    where
291        B: BufMut,
292        Self: Sized,
293    {
294        self.encode(buf).map_err(|e| {
295            DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296        })
297    }
298
299    fn try_into_logical_plan(
300        &self,
301        ctx: &SessionContext,
302        extension_codec: &dyn LogicalExtensionCodec,
303    ) -> Result<LogicalPlan> {
304        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305            proto_error(format!(
306                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307            ))
308        })?;
309        match plan {
310            LogicalPlanType::Values(values) => {
311                let n_cols = values.n_cols as usize;
312                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313                    Ok(Vec::new())
314                } else if values.values_list.len() % n_cols != 0 {
315                    internal_err!(
316                        "Invalid values list length, expect {} to be divisible by {}",
317                        values.values_list.len(),
318                        n_cols
319                    )
320                } else {
321                    values
322                        .values_list
323                        .chunks_exact(n_cols)
324                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325                        .collect::<Result<Vec<_>, _>>()
326                        .map_err(|e| e.into())
327                }?;
328
329                LogicalPlanBuilder::values(values)?.build()
330            }
331            LogicalPlanType::Projection(projection) => {
332                let input: LogicalPlan =
333                    into_logical_plan!(projection.input, ctx, extension_codec)?;
334                let expr: Vec<Expr> =
335                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337                let new_proj = project(input, expr)?;
338                match projection.optional_alias.as_ref() {
339                    Some(a) => match a {
340                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
341                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342                                Arc::new(new_proj),
343                                alias.clone(),
344                            )?))
345                        }
346                    },
347                    _ => Ok(new_proj),
348                }
349            }
350            LogicalPlanType::Selection(selection) => {
351                let input: LogicalPlan =
352                    into_logical_plan!(selection.input, ctx, extension_codec)?;
353                let expr: Expr = selection
354                    .expr
355                    .as_ref()
356                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357                    .transpose()?
358                    .ok_or_else(|| {
359                        DataFusionError::Internal("expression required".to_string())
360                    })?;
361                // .try_into()?;
362                LogicalPlanBuilder::from(input).filter(expr)?.build()
363            }
364            LogicalPlanType::Window(window) => {
365                let input: LogicalPlan =
366                    into_logical_plan!(window.input, ctx, extension_codec)?;
367                let window_expr =
368                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
369                LogicalPlanBuilder::from(input).window(window_expr)?.build()
370            }
371            LogicalPlanType::Aggregate(aggregate) => {
372                let input: LogicalPlan =
373                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
374                let group_expr =
375                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
376                let aggr_expr =
377                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
378                LogicalPlanBuilder::from(input)
379                    .aggregate(group_expr, aggr_expr)?
380                    .build()
381            }
382            LogicalPlanType::ListingScan(scan) => {
383                let schema: Schema = convert_required!(scan.schema)?;
384
385                let mut projection = None;
386                if let Some(columns) = &scan.projection {
387                    let column_indices = columns
388                        .columns
389                        .iter()
390                        .map(|name| schema.index_of(name))
391                        .collect::<Result<Vec<usize>, _>>()?;
392                    projection = Some(column_indices);
393                }
394
395                let filters =
396                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
397
398                let mut all_sort_orders = vec![];
399                for order in &scan.file_sort_order {
400                    all_sort_orders.push(from_proto::parse_sorts(
401                        &order.sort_expr_nodes,
402                        ctx,
403                        extension_codec,
404                    )?)
405                }
406
407                let file_format: Arc<dyn FileFormat> =
408                    match scan.file_format_type.as_ref().ok_or_else(|| {
409                        proto_error(format!(
410                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
411                        ))
412                    })? {
413                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
414                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
415                            #[cfg(feature = "parquet")]
416                            {
417                                let mut parquet = ParquetFormat::default();
418                                if let Some(options) = options {
419                                    parquet = parquet.with_options(options.try_into()?)
420                                }
421                                Arc::new(parquet)
422                            }
423                            #[cfg(not(feature = "parquet"))]
424                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
425                        }
426                        FileFormatType::Csv(protobuf::CsvFormat {
427                            options
428                        }) => {
429                            let mut csv = CsvFormat::default();
430                            if let Some(options) = options {
431                                csv = csv.with_options(options.try_into()?)
432                            }
433                            Arc::new(csv)
434                        },
435                        FileFormatType::Json(protobuf::NdJsonFormat {
436                            options
437                        }) => {
438                            let mut json = OtherNdJsonFormat::default();
439                            if let Some(options) = options {
440                                json = json.with_options(options.try_into()?)
441                            }
442                            Arc::new(json)
443                        }
444                        #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
445                        FileFormatType::Avro(..) => {
446                            #[cfg(feature = "avro")] 
447                            {
448                                Arc::new(AvroFormat)
449                            }
450                            #[cfg(not(feature = "avro"))]
451                            panic!("Unable to process avro file since `avro` feature is not enabled");
452                        }
453                    };
454
455                let table_paths = &scan
456                    .paths
457                    .iter()
458                    .map(ListingTableUrl::parse)
459                    .collect::<Result<Vec<_>, _>>()?;
460
461                let options = ListingOptions::new(file_format)
462                    .with_file_extension(&scan.file_extension)
463                    .with_table_partition_cols(
464                        scan.table_partition_cols
465                            .iter()
466                            .map(|col| {
467                                (
468                                    col.clone(),
469                                    schema
470                                        .field_with_name(col)
471                                        .unwrap()
472                                        .data_type()
473                                        .clone(),
474                                )
475                            })
476                            .collect(),
477                    )
478                    .with_collect_stat(scan.collect_stat)
479                    .with_target_partitions(scan.target_partitions as usize)
480                    .with_file_sort_order(all_sort_orders);
481
482                let config =
483                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
484                        .with_listing_options(options)
485                        .with_schema(Arc::new(schema));
486
487                let provider = ListingTable::try_new(config)?.with_cache(
488                    ctx.state()
489                        .runtime_env()
490                        .cache_manager
491                        .get_file_statistic_cache(),
492                );
493
494                let table_name =
495                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
496
497                LogicalPlanBuilder::scan_with_filters(
498                    table_name,
499                    provider_as_source(Arc::new(provider)),
500                    projection,
501                    filters,
502                )?
503                .build()
504            }
505            LogicalPlanType::CustomScan(scan) => {
506                let schema: Schema = convert_required!(scan.schema)?;
507                let schema = Arc::new(schema);
508                let mut projection = None;
509                if let Some(columns) = &scan.projection {
510                    let column_indices = columns
511                        .columns
512                        .iter()
513                        .map(|name| schema.index_of(name))
514                        .collect::<Result<Vec<usize>, _>>()?;
515                    projection = Some(column_indices);
516                }
517
518                let filters =
519                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
520
521                let table_name =
522                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
523
524                let provider = extension_codec.try_decode_table_provider(
525                    &scan.custom_table_data,
526                    &table_name,
527                    schema,
528                    ctx,
529                )?;
530
531                LogicalPlanBuilder::scan_with_filters(
532                    table_name,
533                    provider_as_source(provider),
534                    projection,
535                    filters,
536                )?
537                .build()
538            }
539            LogicalPlanType::Sort(sort) => {
540                let input: LogicalPlan =
541                    into_logical_plan!(sort.input, ctx, extension_codec)?;
542                let sort_expr: Vec<SortExpr> =
543                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
544                let fetch: Option<usize> = sort.fetch.try_into().ok();
545                LogicalPlanBuilder::from(input)
546                    .sort_with_limit(sort_expr, fetch)?
547                    .build()
548            }
549            LogicalPlanType::Repartition(repartition) => {
550                use datafusion::logical_expr::Partitioning;
551                let input: LogicalPlan =
552                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
553                use protobuf::repartition_node::PartitionMethod;
554                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
555                    internal_datafusion_err!(
556                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
557                    )
558                })?;
559
560                let partitioning_scheme = match pb_partition_method {
561                    PartitionMethod::Hash(protobuf::HashRepartition {
562                        hash_expr: pb_hash_expr,
563                        partition_count,
564                    }) => Partitioning::Hash(
565                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
566                        *partition_count as usize,
567                    ),
568                    PartitionMethod::RoundRobin(partition_count) => {
569                        Partitioning::RoundRobinBatch(*partition_count as usize)
570                    }
571                };
572
573                LogicalPlanBuilder::from(input)
574                    .repartition(partitioning_scheme)?
575                    .build()
576            }
577            LogicalPlanType::EmptyRelation(empty_relation) => {
578                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
579            }
580            LogicalPlanType::CreateExternalTable(create_extern_table) => {
581                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
582                    DataFusionError::Internal(String::from(
583                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
584                    ))
585                })?;
586
587                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
588                    DataFusionError::Internal(String::from(
589                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
590                    ))
591                })?;
592                let definition = if !create_extern_table.definition.is_empty() {
593                    Some(create_extern_table.definition.clone())
594                } else {
595                    None
596                };
597
598                let file_type = create_extern_table.file_type.as_str();
599                if ctx.table_factory(file_type).is_none() {
600                    internal_err!("No TableProviderFactory for file type: {file_type}")?
601                }
602
603                let mut order_exprs = vec![];
604                for expr in &create_extern_table.order_exprs {
605                    order_exprs.push(from_proto::parse_sorts(
606                        &expr.sort_expr_nodes,
607                        ctx,
608                        extension_codec,
609                    )?);
610                }
611
612                let mut column_defaults =
613                    HashMap::with_capacity(create_extern_table.column_defaults.len());
614                for (col_name, expr) in &create_extern_table.column_defaults {
615                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
616                    column_defaults.insert(col_name.clone(), expr);
617                }
618
619                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
620                    CreateExternalTable {
621                        schema: pb_schema.try_into()?,
622                        name: from_table_reference(
623                            create_extern_table.name.as_ref(),
624                            "CreateExternalTable",
625                        )?,
626                        location: create_extern_table.location.clone(),
627                        file_type: create_extern_table.file_type.clone(),
628                        table_partition_cols: create_extern_table
629                            .table_partition_cols
630                            .clone(),
631                        order_exprs,
632                        if_not_exists: create_extern_table.if_not_exists,
633                        temporary: create_extern_table.temporary,
634                        definition,
635                        unbounded: create_extern_table.unbounded,
636                        options: create_extern_table.options.clone(),
637                        constraints: constraints.into(),
638                        column_defaults,
639                    },
640                )))
641            }
642            LogicalPlanType::CreateView(create_view) => {
643                let plan = create_view
644                    .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
645                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
646                )))?
647                    .try_into_logical_plan(ctx, extension_codec)?;
648                let definition = if !create_view.definition.is_empty() {
649                    Some(create_view.definition.clone())
650                } else {
651                    None
652                };
653
654                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
655                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
656                    temporary: create_view.temporary,
657                    input: Arc::new(plan),
658                    or_replace: create_view.or_replace,
659                    definition,
660                })))
661            }
662            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
663                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
664                    DataFusionError::Internal(String::from(
665                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
666                    ))
667                })?;
668
669                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
670                    CreateCatalogSchema {
671                        schema_name: create_catalog_schema.schema_name.clone(),
672                        if_not_exists: create_catalog_schema.if_not_exists,
673                        schema: pb_schema.try_into()?,
674                    },
675                )))
676            }
677            LogicalPlanType::CreateCatalog(create_catalog) => {
678                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
679                    DataFusionError::Internal(String::from(
680                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
681                    ))
682                })?;
683
684                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
685                    CreateCatalog {
686                        catalog_name: create_catalog.catalog_name.clone(),
687                        if_not_exists: create_catalog.if_not_exists,
688                        schema: pb_schema.try_into()?,
689                    },
690                )))
691            }
692            LogicalPlanType::Analyze(analyze) => {
693                let input: LogicalPlan =
694                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
695                LogicalPlanBuilder::from(input)
696                    .explain(analyze.verbose, true)?
697                    .build()
698            }
699            LogicalPlanType::Explain(explain) => {
700                let input: LogicalPlan =
701                    into_logical_plan!(explain.input, ctx, extension_codec)?;
702                LogicalPlanBuilder::from(input)
703                    .explain(explain.verbose, false)?
704                    .build()
705            }
706            LogicalPlanType::SubqueryAlias(aliased_relation) => {
707                let input: LogicalPlan =
708                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
709                let alias = from_table_reference(
710                    aliased_relation.alias.as_ref(),
711                    "SubqueryAlias",
712                )?;
713                LogicalPlanBuilder::from(input).alias(alias)?.build()
714            }
715            LogicalPlanType::Limit(limit) => {
716                let input: LogicalPlan =
717                    into_logical_plan!(limit.input, ctx, extension_codec)?;
718                let skip = limit.skip.max(0) as usize;
719
720                let fetch = if limit.fetch < 0 {
721                    None
722                } else {
723                    Some(limit.fetch as usize)
724                };
725
726                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
727            }
728            LogicalPlanType::Join(join) => {
729                let left_keys: Vec<Expr> =
730                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
731                let right_keys: Vec<Expr> =
732                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
733                let join_type =
734                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
735                        proto_error(format!(
736                            "Received a JoinNode message with unknown JoinType {}",
737                            join.join_type
738                        ))
739                    })?;
740                let join_constraint = protobuf::JoinConstraint::try_from(
741                    join.join_constraint,
742                )
743                .map_err(|_| {
744                    proto_error(format!(
745                        "Received a JoinNode message with unknown JoinConstraint {}",
746                        join.join_constraint
747                    ))
748                })?;
749                let filter: Option<Expr> = join
750                    .filter
751                    .as_ref()
752                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
753                    .map_or(Ok(None), |v| v.map(Some))?;
754
755                let builder = LogicalPlanBuilder::from(into_logical_plan!(
756                    join.left,
757                    ctx,
758                    extension_codec
759                )?);
760                let builder = match join_constraint.into() {
761                    JoinConstraint::On => builder.join_with_expr_keys(
762                        into_logical_plan!(join.right, ctx, extension_codec)?,
763                        join_type.into(),
764                        (left_keys, right_keys),
765                        filter,
766                    )?,
767                    JoinConstraint::Using => {
768                        // The equijoin keys in using-join must be column.
769                        let using_keys = left_keys
770                            .into_iter()
771                            .map(|key| {
772                                key.try_as_col().cloned()
773                                    .ok_or_else(|| internal_datafusion_err!(
774                                        "Using join keys must be column references, got: {key:?}"
775                                    ))
776                            })
777                            .collect::<Result<Vec<_>, _>>()?;
778                        builder.join_using(
779                            into_logical_plan!(join.right, ctx, extension_codec)?,
780                            join_type.into(),
781                            using_keys,
782                        )?
783                    }
784                };
785
786                builder.build()
787            }
788            LogicalPlanType::Union(union) => {
789                if union.inputs.len() < 2 {
790                    return  Err( DataFusionError::Internal(String::from(
791                        "Protobuf deserialization error, Union was require at least two input.",
792                    )));
793                }
794                let (first, rest) = union.inputs.split_first().unwrap();
795                let mut builder = LogicalPlanBuilder::from(
796                    first.try_into_logical_plan(ctx, extension_codec)?,
797                );
798
799                for i in rest {
800                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
801                    builder = builder.union(plan)?;
802                }
803                builder.build()
804            }
805            LogicalPlanType::CrossJoin(crossjoin) => {
806                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
807                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
808
809                LogicalPlanBuilder::from(left).cross_join(right)?.build()
810            }
811            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
812                let input_plans: Vec<LogicalPlan> = inputs
813                    .iter()
814                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
815                    .collect::<Result<_>>()?;
816
817                let extension_node =
818                    extension_codec.try_decode(node, &input_plans, ctx)?;
819                Ok(LogicalPlan::Extension(extension_node))
820            }
821            LogicalPlanType::Distinct(distinct) => {
822                let input: LogicalPlan =
823                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
824                LogicalPlanBuilder::from(input).distinct()?.build()
825            }
826            LogicalPlanType::DistinctOn(distinct_on) => {
827                let input: LogicalPlan =
828                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
829                let on_expr =
830                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
831                let select_expr = from_proto::parse_exprs(
832                    &distinct_on.select_expr,
833                    ctx,
834                    extension_codec,
835                )?;
836                let sort_expr = match distinct_on.sort_expr.len() {
837                    0 => None,
838                    _ => Some(from_proto::parse_sorts(
839                        &distinct_on.sort_expr,
840                        ctx,
841                        extension_codec,
842                    )?),
843                };
844                LogicalPlanBuilder::from(input)
845                    .distinct_on(on_expr, select_expr, sort_expr)?
846                    .build()
847            }
848            LogicalPlanType::ViewScan(scan) => {
849                let schema: Schema = convert_required!(scan.schema)?;
850
851                let mut projection = None;
852                if let Some(columns) = &scan.projection {
853                    let column_indices = columns
854                        .columns
855                        .iter()
856                        .map(|name| schema.index_of(name))
857                        .collect::<Result<Vec<usize>, _>>()?;
858                    projection = Some(column_indices);
859                }
860
861                let input: LogicalPlan =
862                    into_logical_plan!(scan.input, ctx, extension_codec)?;
863
864                let definition = if !scan.definition.is_empty() {
865                    Some(scan.definition.clone())
866                } else {
867                    None
868                };
869
870                let provider = ViewTable::new(input, definition);
871
872                let table_name =
873                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
874
875                LogicalPlanBuilder::scan(
876                    table_name,
877                    provider_as_source(Arc::new(provider)),
878                    projection,
879                )?
880                .build()
881            }
882            LogicalPlanType::Prepare(prepare) => {
883                let input: LogicalPlan =
884                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
885                let data_types: Vec<DataType> = prepare
886                    .data_types
887                    .iter()
888                    .map(DataType::try_from)
889                    .collect::<Result<_, _>>()?;
890                LogicalPlanBuilder::from(input)
891                    .prepare(prepare.name.clone(), data_types)?
892                    .build()
893            }
894            LogicalPlanType::DropView(dropview) => {
895                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
896                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
897                    if_exists: dropview.if_exists,
898                    schema: Arc::new(convert_required!(dropview.schema)?),
899                })))
900            }
901            LogicalPlanType::CopyTo(copy) => {
902                let input: LogicalPlan =
903                    into_logical_plan!(copy.input, ctx, extension_codec)?;
904
905                let file_type: Arc<dyn FileType> = format_as_file_type(
906                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
907                );
908
909                Ok(LogicalPlan::Copy(dml::CopyTo {
910                    input: Arc::new(input),
911                    output_url: copy.output_url.clone(),
912                    partition_by: copy.partition_by.clone(),
913                    file_type,
914                    options: Default::default(),
915                }))
916            }
917            LogicalPlanType::Unnest(unnest) => {
918                let input: LogicalPlan =
919                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
920                Ok(LogicalPlan::Unnest(Unnest {
921                    input: Arc::new(input),
922                    exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
923                    list_type_columns: unnest
924                        .list_type_columns
925                        .iter()
926                        .map(|c| {
927                            let recursion_item = c.recursion.as_ref().unwrap();
928                            (
929                                c.input_index as _,
930                                ColumnUnnestList {
931                                    output_column: recursion_item
932                                        .output_column
933                                        .as_ref()
934                                        .unwrap()
935                                        .into(),
936                                    depth: recursion_item.depth as _,
937                                },
938                            )
939                        })
940                        .collect(),
941                    struct_type_columns: unnest
942                        .struct_type_columns
943                        .iter()
944                        .map(|c| *c as usize)
945                        .collect(),
946                    dependency_indices: unnest
947                        .dependency_indices
948                        .iter()
949                        .map(|c| *c as usize)
950                        .collect(),
951                    schema: Arc::new(convert_required!(unnest.schema)?),
952                    options: into_required!(unnest.options)?,
953                }))
954            }
955            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
956                let static_term = recursive_query_node
957                    .static_term
958                    .as_ref()
959                    .ok_or_else(|| DataFusionError::Internal(String::from(
960                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
961                    )))?
962                    .try_into_logical_plan(ctx, extension_codec)?;
963
964                let recursive_term = recursive_query_node
965                    .recursive_term
966                    .as_ref()
967                    .ok_or_else(|| DataFusionError::Internal(String::from(
968                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
969                    )))?
970                    .try_into_logical_plan(ctx, extension_codec)?;
971
972                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
973                    name: recursive_query_node.name.clone(),
974                    static_term: Arc::new(static_term),
975                    recursive_term: Arc::new(recursive_term),
976                    is_distinct: recursive_query_node.is_distinct,
977                }))
978            }
979            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
980                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
981                let schema = convert_required!(*schema)?;
982                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
983                LogicalPlanBuilder::scan(
984                    name.as_str(),
985                    provider_as_source(Arc::new(cte_work_table)),
986                    None,
987                )?
988                .build()
989            }
990            LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
991                datafusion::logical_expr::DmlStatement::new(
992                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
993                    to_table_source(&dml_node.target, ctx, extension_codec)?,
994                    dml_node.dml_type().into(),
995                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
996                ),
997            )),
998        }
999    }
1000
1001    fn try_from_logical_plan(
1002        plan: &LogicalPlan,
1003        extension_codec: &dyn LogicalExtensionCodec,
1004    ) -> Result<Self>
1005    where
1006        Self: Sized,
1007    {
1008        match plan {
1009            LogicalPlan::Values(Values { values, .. }) => {
1010                let n_cols = if values.is_empty() {
1011                    0
1012                } else {
1013                    values[0].len()
1014                } as u64;
1015                let values_list =
1016                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1017                Ok(LogicalPlanNode {
1018                    logical_plan_type: Some(LogicalPlanType::Values(
1019                        protobuf::ValuesNode {
1020                            n_cols,
1021                            values_list,
1022                        },
1023                    )),
1024                })
1025            }
1026            LogicalPlan::TableScan(TableScan {
1027                table_name,
1028                source,
1029                filters,
1030                projection,
1031                ..
1032            }) => {
1033                let provider = source_as_provider(source)?;
1034                let schema = provider.schema();
1035                let source = provider.as_any();
1036
1037                let projection = match projection {
1038                    None => None,
1039                    Some(columns) => {
1040                        let column_names = columns
1041                            .iter()
1042                            .map(|i| schema.field(*i).name().to_owned())
1043                            .collect();
1044                        Some(protobuf::ProjectionColumns {
1045                            columns: column_names,
1046                        })
1047                    }
1048                };
1049                let schema: protobuf::Schema = schema.as_ref().try_into()?;
1050
1051                let filters: Vec<protobuf::LogicalExprNode> =
1052                    serialize_exprs(filters, extension_codec)?;
1053
1054                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1055                    let any = listing_table.options().format.as_any();
1056                    let file_format_type = {
1057                        let mut maybe_some_type = None;
1058
1059                        #[cfg(feature = "parquet")]
1060                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1061                            let options = parquet.options();
1062                            maybe_some_type =
1063                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1064                                    options: Some(options.try_into()?),
1065                                }));
1066                        };
1067
1068                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1069                            let options = csv.options();
1070                            maybe_some_type =
1071                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1072                                    options: Some(options.try_into()?),
1073                                }));
1074                        }
1075
1076                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1077                            let options = json.options();
1078                            maybe_some_type =
1079                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1080                                    options: Some(options.try_into()?),
1081                                }))
1082                        }
1083
1084                        #[cfg(feature = "avro")]
1085                        if any.is::<AvroFormat>() {
1086                            maybe_some_type =
1087                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1088                        }
1089
1090                        if let Some(file_format_type) = maybe_some_type {
1091                            file_format_type
1092                        } else {
1093                            return Err(proto_error(format!(
1094                            "Error converting file format, {:?} is invalid as a datafusion format.",
1095                            listing_table.options().format
1096                        )));
1097                        }
1098                    };
1099
1100                    let options = listing_table.options();
1101
1102                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1103                    for order in &options.file_sort_order {
1104                        let expr_vec = SortExprNodeCollection {
1105                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1106                        };
1107                        exprs_vec.push(expr_vec);
1108                    }
1109
1110                    Ok(LogicalPlanNode {
1111                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1112                            protobuf::ListingTableScanNode {
1113                                file_format_type: Some(file_format_type),
1114                                table_name: Some(table_name.clone().into()),
1115                                collect_stat: options.collect_stat,
1116                                file_extension: options.file_extension.clone(),
1117                                table_partition_cols: options
1118                                    .table_partition_cols
1119                                    .iter()
1120                                    .map(|x| x.0.clone())
1121                                    .collect::<Vec<_>>(),
1122                                paths: listing_table
1123                                    .table_paths()
1124                                    .iter()
1125                                    .map(|x| x.to_string())
1126                                    .collect(),
1127                                schema: Some(schema),
1128                                projection,
1129                                filters,
1130                                target_partitions: options.target_partitions as u32,
1131                                file_sort_order: exprs_vec,
1132                            },
1133                        )),
1134                    })
1135                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1136                    Ok(LogicalPlanNode {
1137                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1138                            protobuf::ViewTableScanNode {
1139                                table_name: Some(table_name.clone().into()),
1140                                input: Some(Box::new(
1141                                    LogicalPlanNode::try_from_logical_plan(
1142                                        view_table.logical_plan(),
1143                                        extension_codec,
1144                                    )?,
1145                                )),
1146                                schema: Some(schema),
1147                                projection,
1148                                definition: view_table
1149                                    .definition()
1150                                    .map(|s| s.to_string())
1151                                    .unwrap_or_default(),
1152                            },
1153                        ))),
1154                    })
1155                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1156                {
1157                    let name = cte_work_table.name().to_string();
1158                    let schema = cte_work_table.schema();
1159                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1160
1161                    Ok(LogicalPlanNode {
1162                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1163                            protobuf::CteWorkTableScanNode {
1164                                name,
1165                                schema: Some(schema),
1166                            },
1167                        )),
1168                    })
1169                } else {
1170                    let mut bytes = vec![];
1171                    extension_codec
1172                        .try_encode_table_provider(table_name, provider, &mut bytes)
1173                        .map_err(|e| context!("Error serializing custom table", e))?;
1174                    let scan = CustomScan(CustomTableScanNode {
1175                        table_name: Some(table_name.clone().into()),
1176                        projection,
1177                        schema: Some(schema),
1178                        filters,
1179                        custom_table_data: bytes,
1180                    });
1181                    let node = LogicalPlanNode {
1182                        logical_plan_type: Some(scan),
1183                    };
1184                    Ok(node)
1185                }
1186            }
1187            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1188                Ok(LogicalPlanNode {
1189                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1190                        protobuf::ProjectionNode {
1191                            input: Some(Box::new(
1192                                LogicalPlanNode::try_from_logical_plan(
1193                                    input.as_ref(),
1194                                    extension_codec,
1195                                )?,
1196                            )),
1197                            expr: serialize_exprs(expr, extension_codec)?,
1198                            optional_alias: None,
1199                        },
1200                    ))),
1201                })
1202            }
1203            LogicalPlan::Filter(filter) => {
1204                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1205                    filter.input.as_ref(),
1206                    extension_codec,
1207                )?;
1208                Ok(LogicalPlanNode {
1209                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1210                        protobuf::SelectionNode {
1211                            input: Some(Box::new(input)),
1212                            expr: Some(serialize_expr(
1213                                &filter.predicate,
1214                                extension_codec,
1215                            )?),
1216                        },
1217                    ))),
1218                })
1219            }
1220            LogicalPlan::Distinct(Distinct::All(input)) => {
1221                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1222                    input.as_ref(),
1223                    extension_codec,
1224                )?;
1225                Ok(LogicalPlanNode {
1226                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1227                        protobuf::DistinctNode {
1228                            input: Some(Box::new(input)),
1229                        },
1230                    ))),
1231                })
1232            }
1233            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1234                on_expr,
1235                select_expr,
1236                sort_expr,
1237                input,
1238                ..
1239            })) => {
1240                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1241                    input.as_ref(),
1242                    extension_codec,
1243                )?;
1244                let sort_expr = match sort_expr {
1245                    None => vec![],
1246                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1247                };
1248                Ok(LogicalPlanNode {
1249                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1250                        protobuf::DistinctOnNode {
1251                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1252                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1253                            sort_expr,
1254                            input: Some(Box::new(input)),
1255                        },
1256                    ))),
1257                })
1258            }
1259            LogicalPlan::Window(Window {
1260                input, window_expr, ..
1261            }) => {
1262                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1263                    input.as_ref(),
1264                    extension_codec,
1265                )?;
1266                Ok(LogicalPlanNode {
1267                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1268                        protobuf::WindowNode {
1269                            input: Some(Box::new(input)),
1270                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1271                        },
1272                    ))),
1273                })
1274            }
1275            LogicalPlan::Aggregate(Aggregate {
1276                group_expr,
1277                aggr_expr,
1278                input,
1279                ..
1280            }) => {
1281                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1282                    input.as_ref(),
1283                    extension_codec,
1284                )?;
1285                Ok(LogicalPlanNode {
1286                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1287                        protobuf::AggregateNode {
1288                            input: Some(Box::new(input)),
1289                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1290                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1291                        },
1292                    ))),
1293                })
1294            }
1295            LogicalPlan::Join(Join {
1296                left,
1297                right,
1298                on,
1299                filter,
1300                join_type,
1301                join_constraint,
1302                null_equals_null,
1303                ..
1304            }) => {
1305                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1306                    left.as_ref(),
1307                    extension_codec,
1308                )?;
1309                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310                    right.as_ref(),
1311                    extension_codec,
1312                )?;
1313                let (left_join_key, right_join_key) = on
1314                    .iter()
1315                    .map(|(l, r)| {
1316                        Ok((
1317                            serialize_expr(l, extension_codec)?,
1318                            serialize_expr(r, extension_codec)?,
1319                        ))
1320                    })
1321                    .collect::<Result<Vec<_>, ToProtoError>>()?
1322                    .into_iter()
1323                    .unzip();
1324                let join_type: protobuf::JoinType = join_type.to_owned().into();
1325                let join_constraint: protobuf::JoinConstraint =
1326                    join_constraint.to_owned().into();
1327                let filter = filter
1328                    .as_ref()
1329                    .map(|e| serialize_expr(e, extension_codec))
1330                    .map_or(Ok(None), |v| v.map(Some))?;
1331                Ok(LogicalPlanNode {
1332                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1333                        protobuf::JoinNode {
1334                            left: Some(Box::new(left)),
1335                            right: Some(Box::new(right)),
1336                            join_type: join_type.into(),
1337                            join_constraint: join_constraint.into(),
1338                            left_join_key,
1339                            right_join_key,
1340                            null_equals_null: *null_equals_null,
1341                            filter,
1342                        },
1343                    ))),
1344                })
1345            }
1346            LogicalPlan::Subquery(_) => {
1347                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1348            }
1349            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1350                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1351                    input.as_ref(),
1352                    extension_codec,
1353                )?;
1354                Ok(LogicalPlanNode {
1355                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1356                        protobuf::SubqueryAliasNode {
1357                            input: Some(Box::new(input)),
1358                            alias: Some((*alias).clone().into()),
1359                        },
1360                    ))),
1361                })
1362            }
1363            LogicalPlan::Limit(limit) => {
1364                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1365                    limit.input.as_ref(),
1366                    extension_codec,
1367                )?;
1368                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1369                    return Err(proto_error(
1370                        "LogicalPlan::Limit only supports literal skip values",
1371                    ));
1372                };
1373                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1374                    return Err(proto_error(
1375                        "LogicalPlan::Limit only supports literal fetch values",
1376                    ));
1377                };
1378
1379                Ok(LogicalPlanNode {
1380                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1381                        protobuf::LimitNode {
1382                            input: Some(Box::new(input)),
1383                            skip: skip as i64,
1384                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1385                        },
1386                    ))),
1387                })
1388            }
1389            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1390                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1391                    input.as_ref(),
1392                    extension_codec,
1393                )?;
1394                let sort_expr: Vec<protobuf::SortExprNode> =
1395                    serialize_sorts(expr, extension_codec)?;
1396                Ok(LogicalPlanNode {
1397                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1398                        protobuf::SortNode {
1399                            input: Some(Box::new(input)),
1400                            expr: sort_expr,
1401                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1402                        },
1403                    ))),
1404                })
1405            }
1406            LogicalPlan::Repartition(Repartition {
1407                input,
1408                partitioning_scheme,
1409            }) => {
1410                use datafusion::logical_expr::Partitioning;
1411                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1412                    input.as_ref(),
1413                    extension_codec,
1414                )?;
1415
1416                // Assumed common usize field was batch size
1417                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
1418                use protobuf::repartition_node::PartitionMethod;
1419
1420                let pb_partition_method = match partitioning_scheme {
1421                    Partitioning::Hash(exprs, partition_count) => {
1422                        PartitionMethod::Hash(protobuf::HashRepartition {
1423                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1424                            partition_count: *partition_count as u64,
1425                        })
1426                    }
1427                    Partitioning::RoundRobinBatch(partition_count) => {
1428                        PartitionMethod::RoundRobin(*partition_count as u64)
1429                    }
1430                    Partitioning::DistributeBy(_) => {
1431                        return not_impl_err!("DistributeBy")
1432                    }
1433                };
1434
1435                Ok(LogicalPlanNode {
1436                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1437                        protobuf::RepartitionNode {
1438                            input: Some(Box::new(input)),
1439                            partition_method: Some(pb_partition_method),
1440                        },
1441                    ))),
1442                })
1443            }
1444            LogicalPlan::EmptyRelation(EmptyRelation {
1445                produce_one_row, ..
1446            }) => Ok(LogicalPlanNode {
1447                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1448                    protobuf::EmptyRelationNode {
1449                        produce_one_row: *produce_one_row,
1450                    },
1451                )),
1452            }),
1453            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1454                CreateExternalTable {
1455                    name,
1456                    location,
1457                    file_type,
1458                    schema: df_schema,
1459                    table_partition_cols,
1460                    if_not_exists,
1461                    definition,
1462                    order_exprs,
1463                    unbounded,
1464                    options,
1465                    constraints,
1466                    column_defaults,
1467                    temporary,
1468                },
1469            )) => {
1470                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1471                for order in order_exprs {
1472                    let temp = SortExprNodeCollection {
1473                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1474                    };
1475                    converted_order_exprs.push(temp);
1476                }
1477
1478                let mut converted_column_defaults =
1479                    HashMap::with_capacity(column_defaults.len());
1480                for (col_name, expr) in column_defaults {
1481                    converted_column_defaults
1482                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1483                }
1484
1485                Ok(LogicalPlanNode {
1486                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1487                        protobuf::CreateExternalTableNode {
1488                            name: Some(name.clone().into()),
1489                            location: location.clone(),
1490                            file_type: file_type.clone(),
1491                            schema: Some(df_schema.try_into()?),
1492                            table_partition_cols: table_partition_cols.clone(),
1493                            if_not_exists: *if_not_exists,
1494                            temporary: *temporary,
1495                            order_exprs: converted_order_exprs,
1496                            definition: definition.clone().unwrap_or_default(),
1497                            unbounded: *unbounded,
1498                            options: options.clone(),
1499                            constraints: Some(constraints.clone().into()),
1500                            column_defaults: converted_column_defaults,
1501                        },
1502                    )),
1503                })
1504            }
1505            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1506                name,
1507                input,
1508                or_replace,
1509                definition,
1510                temporary,
1511            })) => Ok(LogicalPlanNode {
1512                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1513                    protobuf::CreateViewNode {
1514                        name: Some(name.clone().into()),
1515                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1516                            input,
1517                            extension_codec,
1518                        )?)),
1519                        or_replace: *or_replace,
1520                        temporary: *temporary,
1521                        definition: definition.clone().unwrap_or_default(),
1522                    },
1523                ))),
1524            }),
1525            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1526                CreateCatalogSchema {
1527                    schema_name,
1528                    if_not_exists,
1529                    schema: df_schema,
1530                },
1531            )) => Ok(LogicalPlanNode {
1532                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1533                    protobuf::CreateCatalogSchemaNode {
1534                        schema_name: schema_name.clone(),
1535                        if_not_exists: *if_not_exists,
1536                        schema: Some(df_schema.try_into()?),
1537                    },
1538                )),
1539            }),
1540            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1541                catalog_name,
1542                if_not_exists,
1543                schema: df_schema,
1544            })) => Ok(LogicalPlanNode {
1545                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1546                    protobuf::CreateCatalogNode {
1547                        catalog_name: catalog_name.clone(),
1548                        if_not_exists: *if_not_exists,
1549                        schema: Some(df_schema.try_into()?),
1550                    },
1551                )),
1552            }),
1553            LogicalPlan::Analyze(a) => {
1554                let input = LogicalPlanNode::try_from_logical_plan(
1555                    a.input.as_ref(),
1556                    extension_codec,
1557                )?;
1558                Ok(LogicalPlanNode {
1559                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1560                        protobuf::AnalyzeNode {
1561                            input: Some(Box::new(input)),
1562                            verbose: a.verbose,
1563                        },
1564                    ))),
1565                })
1566            }
1567            LogicalPlan::Explain(a) => {
1568                let input = LogicalPlanNode::try_from_logical_plan(
1569                    a.plan.as_ref(),
1570                    extension_codec,
1571                )?;
1572                Ok(LogicalPlanNode {
1573                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1574                        protobuf::ExplainNode {
1575                            input: Some(Box::new(input)),
1576                            verbose: a.verbose,
1577                        },
1578                    ))),
1579                })
1580            }
1581            LogicalPlan::Union(union) => {
1582                let inputs: Vec<LogicalPlanNode> = union
1583                    .inputs
1584                    .iter()
1585                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1586                    .collect::<Result<_>>()?;
1587                Ok(LogicalPlanNode {
1588                    logical_plan_type: Some(LogicalPlanType::Union(
1589                        protobuf::UnionNode { inputs },
1590                    )),
1591                })
1592            }
1593            LogicalPlan::Extension(extension) => {
1594                let mut buf: Vec<u8> = vec![];
1595                extension_codec.try_encode(extension, &mut buf)?;
1596
1597                let inputs: Vec<LogicalPlanNode> = extension
1598                    .node
1599                    .inputs()
1600                    .iter()
1601                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1602                    .collect::<Result<_>>()?;
1603
1604                Ok(LogicalPlanNode {
1605                    logical_plan_type: Some(LogicalPlanType::Extension(
1606                        LogicalExtensionNode { node: buf, inputs },
1607                    )),
1608                })
1609            }
1610            LogicalPlan::Statement(Statement::Prepare(Prepare {
1611                name,
1612                data_types,
1613                input,
1614            })) => {
1615                let input =
1616                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1617                Ok(LogicalPlanNode {
1618                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1619                        protobuf::PrepareNode {
1620                            name: name.clone(),
1621                            data_types: data_types
1622                                .iter()
1623                                .map(|t| t.try_into())
1624                                .collect::<Result<Vec<_>, _>>()?,
1625                            input: Some(Box::new(input)),
1626                        },
1627                    ))),
1628                })
1629            }
1630            LogicalPlan::Unnest(Unnest {
1631                input,
1632                exec_columns,
1633                list_type_columns,
1634                struct_type_columns,
1635                dependency_indices,
1636                schema,
1637                options,
1638            }) => {
1639                let input =
1640                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1641                let proto_unnest_list_items = list_type_columns
1642                    .iter()
1643                    .map(|(index, ul)| ColumnUnnestListItem {
1644                        input_index: *index as _,
1645                        recursion: Some(ColumnUnnestListRecursion {
1646                            output_column: Some(ul.output_column.to_owned().into()),
1647                            depth: ul.depth as _,
1648                        }),
1649                    })
1650                    .collect();
1651                Ok(LogicalPlanNode {
1652                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1653                        protobuf::UnnestNode {
1654                            input: Some(Box::new(input)),
1655                            exec_columns: exec_columns
1656                                .iter()
1657                                .map(|col| col.into())
1658                                .collect(),
1659                            list_type_columns: proto_unnest_list_items,
1660                            struct_type_columns: struct_type_columns
1661                                .iter()
1662                                .map(|c| *c as u64)
1663                                .collect(),
1664                            dependency_indices: dependency_indices
1665                                .iter()
1666                                .map(|c| *c as u64)
1667                                .collect(),
1668                            schema: Some(schema.try_into()?),
1669                            options: Some(options.into()),
1670                        },
1671                    ))),
1672                })
1673            }
1674            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1675                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1676            )),
1677            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1678                "LogicalPlan serde is not yet implemented for CreateIndex",
1679            )),
1680            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1681                "LogicalPlan serde is not yet implemented for DropTable",
1682            )),
1683            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1684                name,
1685                if_exists,
1686                schema,
1687            })) => Ok(LogicalPlanNode {
1688                logical_plan_type: Some(LogicalPlanType::DropView(
1689                    protobuf::DropViewNode {
1690                        name: Some(name.clone().into()),
1691                        if_exists: *if_exists,
1692                        schema: Some(schema.try_into()?),
1693                    },
1694                )),
1695            }),
1696            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1697                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1698            )),
1699            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1700                "LogicalPlan serde is not yet implemented for CreateFunction",
1701            )),
1702            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1703                "LogicalPlan serde is not yet implemented for DropFunction",
1704            )),
1705            LogicalPlan::Statement(_) => Err(proto_error(
1706                "LogicalPlan serde is not yet implemented for Statement",
1707            )),
1708            LogicalPlan::Dml(DmlStatement {
1709                table_name,
1710                target,
1711                op,
1712                input,
1713                ..
1714            }) => {
1715                let input =
1716                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1717                let dml_type: dml_node::Type = op.into();
1718                Ok(LogicalPlanNode {
1719                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1720                        input: Some(Box::new(input)),
1721                        target: Some(Box::new(from_table_source(
1722                            table_name.clone(),
1723                            Arc::clone(target),
1724                            extension_codec,
1725                        )?)),
1726                        table_name: Some(table_name.clone().into()),
1727                        dml_type: dml_type.into(),
1728                    }))),
1729                })
1730            }
1731            LogicalPlan::Copy(dml::CopyTo {
1732                input,
1733                output_url,
1734                file_type,
1735                partition_by,
1736                ..
1737            }) => {
1738                let input =
1739                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1740                let mut buf = Vec::new();
1741                extension_codec
1742                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1743
1744                Ok(LogicalPlanNode {
1745                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1746                        protobuf::CopyToNode {
1747                            input: Some(Box::new(input)),
1748                            output_url: output_url.to_string(),
1749                            file_type: buf,
1750                            partition_by: partition_by.clone(),
1751                        },
1752                    ))),
1753                })
1754            }
1755            LogicalPlan::DescribeTable(_) => Err(proto_error(
1756                "LogicalPlan serde is not yet implemented for DescribeTable",
1757            )),
1758            LogicalPlan::RecursiveQuery(recursive) => {
1759                let static_term = LogicalPlanNode::try_from_logical_plan(
1760                    recursive.static_term.as_ref(),
1761                    extension_codec,
1762                )?;
1763                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1764                    recursive.recursive_term.as_ref(),
1765                    extension_codec,
1766                )?;
1767
1768                Ok(LogicalPlanNode {
1769                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1770                        protobuf::RecursiveQueryNode {
1771                            name: recursive.name.clone(),
1772                            static_term: Some(Box::new(static_term)),
1773                            recursive_term: Some(Box::new(recursive_term)),
1774                            is_distinct: recursive.is_distinct,
1775                        },
1776                    ))),
1777                })
1778            }
1779        }
1780    }
1781}