datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "parquet")]
39use datafusion::datasource::file_format::parquet::ParquetFormat;
40use datafusion::datasource::file_format::{
41    file_type_to_format, format_as_file_type, FileFormatFactory,
42};
43use datafusion::{
44    datasource::{
45        file_format::{
46            avro::AvroFormat, csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat,
47            FileFormat,
48        },
49        listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
50        view::ViewTable,
51        TableProvider,
52    },
53    datasource::{provider_as_source, source_as_provider},
54    prelude::SessionContext,
55};
56use datafusion_common::file_options::file_type::FileType;
57use datafusion_common::{
58    context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
59    DataFusionError, Result, TableReference, ToDFSchema,
60};
61use datafusion_expr::{
62    dml,
63    logical_plan::{
64        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
65        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
66        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
67        SubqueryAlias, TableScan, Values, Window,
68    },
69    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
70    Statement, WindowUDF,
71};
72use datafusion_expr::{
73    AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
74    TableSource, Unnest,
75};
76
77use self::to_proto::{serialize_expr, serialize_exprs};
78use crate::logical_plan::to_proto::serialize_sorts;
79use prost::bytes::BufMut;
80use prost::Message;
81
82pub mod file_formats;
83pub mod from_proto;
84pub mod to_proto;
85
86pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
87    fn try_decode(buf: &[u8]) -> Result<Self>
88    where
89        Self: Sized;
90
91    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
92    where
93        B: BufMut,
94        Self: Sized;
95
96    fn try_into_logical_plan(
97        &self,
98        ctx: &SessionContext,
99        extension_codec: &dyn LogicalExtensionCodec,
100    ) -> Result<LogicalPlan>;
101
102    fn try_from_logical_plan(
103        plan: &LogicalPlan,
104        extension_codec: &dyn LogicalExtensionCodec,
105    ) -> Result<Self>
106    where
107        Self: Sized;
108}
109
110pub trait LogicalExtensionCodec: Debug + Send + Sync {
111    fn try_decode(
112        &self,
113        buf: &[u8],
114        inputs: &[LogicalPlan],
115        ctx: &SessionContext,
116    ) -> Result<Extension>;
117
118    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
119
120    fn try_decode_table_provider(
121        &self,
122        buf: &[u8],
123        table_ref: &TableReference,
124        schema: SchemaRef,
125        ctx: &SessionContext,
126    ) -> Result<Arc<dyn TableProvider>>;
127
128    fn try_encode_table_provider(
129        &self,
130        table_ref: &TableReference,
131        node: Arc<dyn TableProvider>,
132        buf: &mut Vec<u8>,
133    ) -> Result<()>;
134
135    fn try_decode_file_format(
136        &self,
137        _buf: &[u8],
138        _ctx: &SessionContext,
139    ) -> Result<Arc<dyn FileFormatFactory>> {
140        not_impl_err!("LogicalExtensionCodec is not provided for file format")
141    }
142
143    fn try_encode_file_format(
144        &self,
145        _buf: &mut Vec<u8>,
146        _node: Arc<dyn FileFormatFactory>,
147    ) -> Result<()> {
148        Ok(())
149    }
150
151    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
152        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
153    }
154
155    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
156        Ok(())
157    }
158
159    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
160        not_impl_err!(
161            "LogicalExtensionCodec is not provided for aggregate function {name}"
162        )
163    }
164
165    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
166        Ok(())
167    }
168
169    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
170        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
171    }
172
173    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
174        Ok(())
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct DefaultLogicalExtensionCodec {}
180
181impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
182    fn try_decode(
183        &self,
184        _buf: &[u8],
185        _inputs: &[LogicalPlan],
186        _ctx: &SessionContext,
187    ) -> Result<Extension> {
188        not_impl_err!("LogicalExtensionCodec is not provided")
189    }
190
191    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
192        not_impl_err!("LogicalExtensionCodec is not provided")
193    }
194
195    fn try_decode_table_provider(
196        &self,
197        _buf: &[u8],
198        _table_ref: &TableReference,
199        _schema: SchemaRef,
200        _ctx: &SessionContext,
201    ) -> Result<Arc<dyn TableProvider>> {
202        not_impl_err!("LogicalExtensionCodec is not provided")
203    }
204
205    fn try_encode_table_provider(
206        &self,
207        _table_ref: &TableReference,
208        _node: Arc<dyn TableProvider>,
209        _buf: &mut Vec<u8>,
210    ) -> Result<()> {
211        not_impl_err!("LogicalExtensionCodec is not provided")
212    }
213}
214
215#[macro_export]
216macro_rules! into_logical_plan {
217    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
218        if let Some(field) = $PB.as_ref() {
219            field.as_ref().try_into_logical_plan($CTX, $CODEC)
220        } else {
221            Err(proto_error("Missing required field in protobuf"))
222        }
223    }};
224}
225
226fn from_table_reference(
227    table_ref: Option<&protobuf::TableReference>,
228    error_context: &str,
229) -> Result<TableReference> {
230    let table_ref = table_ref.ok_or_else(|| {
231        DataFusionError::Internal(format!(
232            "Protobuf deserialization error, {error_context} was missing required field name."
233        ))
234    })?;
235
236    Ok(table_ref.clone().try_into()?)
237}
238
239/// Converts [LogicalPlan::TableScan] to [TableSource]
240/// method to be used to deserialize nodes
241/// serialized by [from_table_source]
242fn to_table_source(
243    node: &Option<Box<LogicalPlanNode>>,
244    ctx: &SessionContext,
245    extension_codec: &dyn LogicalExtensionCodec,
246) -> Result<Arc<dyn TableSource>> {
247    if let Some(node) = node {
248        match node.try_into_logical_plan(ctx, extension_codec)? {
249            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
250            _ => plan_err!("expected TableScan node"),
251        }
252    } else {
253        plan_err!("LogicalPlanNode should be provided")
254    }
255}
256
257/// converts [TableSource] to [LogicalPlan::TableScan]
258/// using [LogicalPlan::TableScan] was the best approach to
259/// serialize [TableSource] to [LogicalPlan::TableScan]
260fn from_table_source(
261    table_name: TableReference,
262    target: Arc<dyn TableSource>,
263    extension_codec: &dyn LogicalExtensionCodec,
264) -> Result<LogicalPlanNode> {
265    let projected_schema = target.schema().to_dfschema_ref()?;
266    let r = LogicalPlan::TableScan(TableScan {
267        table_name,
268        source: target,
269        projection: None,
270        projected_schema,
271        filters: vec![],
272        fetch: None,
273    });
274
275    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
276}
277
278impl AsLogicalPlan for LogicalPlanNode {
279    fn try_decode(buf: &[u8]) -> Result<Self>
280    where
281        Self: Sized,
282    {
283        LogicalPlanNode::decode(buf).map_err(|e| {
284            DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
285        })
286    }
287
288    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
289    where
290        B: BufMut,
291        Self: Sized,
292    {
293        self.encode(buf).map_err(|e| {
294            DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
295        })
296    }
297
298    fn try_into_logical_plan(
299        &self,
300        ctx: &SessionContext,
301        extension_codec: &dyn LogicalExtensionCodec,
302    ) -> Result<LogicalPlan> {
303        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
304            proto_error(format!(
305                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
306            ))
307        })?;
308        match plan {
309            LogicalPlanType::Values(values) => {
310                let n_cols = values.n_cols as usize;
311                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
312                    Ok(Vec::new())
313                } else if values.values_list.len() % n_cols != 0 {
314                    internal_err!(
315                        "Invalid values list length, expect {} to be divisible by {}",
316                        values.values_list.len(),
317                        n_cols
318                    )
319                } else {
320                    values
321                        .values_list
322                        .chunks_exact(n_cols)
323                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
324                        .collect::<Result<Vec<_>, _>>()
325                        .map_err(|e| e.into())
326                }?;
327
328                LogicalPlanBuilder::values(values)?.build()
329            }
330            LogicalPlanType::Projection(projection) => {
331                let input: LogicalPlan =
332                    into_logical_plan!(projection.input, ctx, extension_codec)?;
333                let expr: Vec<Expr> =
334                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
335
336                let new_proj = project(input, expr)?;
337                match projection.optional_alias.as_ref() {
338                    Some(a) => match a {
339                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
340                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
341                                Arc::new(new_proj),
342                                alias.clone(),
343                            )?))
344                        }
345                    },
346                    _ => Ok(new_proj),
347                }
348            }
349            LogicalPlanType::Selection(selection) => {
350                let input: LogicalPlan =
351                    into_logical_plan!(selection.input, ctx, extension_codec)?;
352                let expr: Expr = selection
353                    .expr
354                    .as_ref()
355                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
356                    .transpose()?
357                    .ok_or_else(|| {
358                        DataFusionError::Internal("expression required".to_string())
359                    })?;
360                // .try_into()?;
361                LogicalPlanBuilder::from(input).filter(expr)?.build()
362            }
363            LogicalPlanType::Window(window) => {
364                let input: LogicalPlan =
365                    into_logical_plan!(window.input, ctx, extension_codec)?;
366                let window_expr =
367                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
368                LogicalPlanBuilder::from(input).window(window_expr)?.build()
369            }
370            LogicalPlanType::Aggregate(aggregate) => {
371                let input: LogicalPlan =
372                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
373                let group_expr =
374                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
375                let aggr_expr =
376                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
377                LogicalPlanBuilder::from(input)
378                    .aggregate(group_expr, aggr_expr)?
379                    .build()
380            }
381            LogicalPlanType::ListingScan(scan) => {
382                let schema: Schema = convert_required!(scan.schema)?;
383
384                let mut projection = None;
385                if let Some(columns) = &scan.projection {
386                    let column_indices = columns
387                        .columns
388                        .iter()
389                        .map(|name| schema.index_of(name))
390                        .collect::<Result<Vec<usize>, _>>()?;
391                    projection = Some(column_indices);
392                }
393
394                let filters =
395                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
396
397                let mut all_sort_orders = vec![];
398                for order in &scan.file_sort_order {
399                    all_sort_orders.push(from_proto::parse_sorts(
400                        &order.sort_expr_nodes,
401                        ctx,
402                        extension_codec,
403                    )?)
404                }
405
406                let file_format: Arc<dyn FileFormat> =
407                    match scan.file_format_type.as_ref().ok_or_else(|| {
408                        proto_error(format!(
409                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
410                        ))
411                    })? {
412                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
413                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
414                            #[cfg(feature = "parquet")]
415                            {
416                                let mut parquet = ParquetFormat::default();
417                                if let Some(options) = options {
418                                    parquet = parquet.with_options(options.try_into()?)
419                                }
420                                Arc::new(parquet)
421                            }
422                            #[cfg(not(feature = "parquet"))]
423                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
424                        }
425                        FileFormatType::Csv(protobuf::CsvFormat {
426                            options
427                        }) => {
428                            let mut csv = CsvFormat::default();
429                            if let Some(options) = options {
430                                csv = csv.with_options(options.try_into()?)
431                            }
432                            Arc::new(csv)
433                        },
434                        FileFormatType::Json(protobuf::NdJsonFormat {
435                            options
436                        }) => {
437                            let mut json = OtherNdJsonFormat::default();
438                            if let Some(options) = options {
439                                json = json.with_options(options.try_into()?)
440                            }
441                            Arc::new(json)
442                        }
443                        FileFormatType::Avro(..) => Arc::new(AvroFormat),
444                    };
445
446                let table_paths = &scan
447                    .paths
448                    .iter()
449                    .map(ListingTableUrl::parse)
450                    .collect::<Result<Vec<_>, _>>()?;
451
452                let options = ListingOptions::new(file_format)
453                    .with_file_extension(&scan.file_extension)
454                    .with_table_partition_cols(
455                        scan.table_partition_cols
456                            .iter()
457                            .map(|col| {
458                                (
459                                    col.clone(),
460                                    schema
461                                        .field_with_name(col)
462                                        .unwrap()
463                                        .data_type()
464                                        .clone(),
465                                )
466                            })
467                            .collect(),
468                    )
469                    .with_collect_stat(scan.collect_stat)
470                    .with_target_partitions(scan.target_partitions as usize)
471                    .with_file_sort_order(all_sort_orders);
472
473                let config =
474                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
475                        .with_listing_options(options)
476                        .with_schema(Arc::new(schema));
477
478                let provider = ListingTable::try_new(config)?.with_cache(
479                    ctx.state()
480                        .runtime_env()
481                        .cache_manager
482                        .get_file_statistic_cache(),
483                );
484
485                let table_name =
486                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
487
488                LogicalPlanBuilder::scan_with_filters(
489                    table_name,
490                    provider_as_source(Arc::new(provider)),
491                    projection,
492                    filters,
493                )?
494                .build()
495            }
496            LogicalPlanType::CustomScan(scan) => {
497                let schema: Schema = convert_required!(scan.schema)?;
498                let schema = Arc::new(schema);
499                let mut projection = None;
500                if let Some(columns) = &scan.projection {
501                    let column_indices = columns
502                        .columns
503                        .iter()
504                        .map(|name| schema.index_of(name))
505                        .collect::<Result<Vec<usize>, _>>()?;
506                    projection = Some(column_indices);
507                }
508
509                let filters =
510                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
511
512                let table_name =
513                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
514
515                let provider = extension_codec.try_decode_table_provider(
516                    &scan.custom_table_data,
517                    &table_name,
518                    schema,
519                    ctx,
520                )?;
521
522                LogicalPlanBuilder::scan_with_filters(
523                    table_name,
524                    provider_as_source(provider),
525                    projection,
526                    filters,
527                )?
528                .build()
529            }
530            LogicalPlanType::Sort(sort) => {
531                let input: LogicalPlan =
532                    into_logical_plan!(sort.input, ctx, extension_codec)?;
533                let sort_expr: Vec<SortExpr> =
534                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
535                let fetch: Option<usize> = sort.fetch.try_into().ok();
536                LogicalPlanBuilder::from(input)
537                    .sort_with_limit(sort_expr, fetch)?
538                    .build()
539            }
540            LogicalPlanType::Repartition(repartition) => {
541                use datafusion::logical_expr::Partitioning;
542                let input: LogicalPlan =
543                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
544                use protobuf::repartition_node::PartitionMethod;
545                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
546                    internal_datafusion_err!(
547                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
548                    )
549                })?;
550
551                let partitioning_scheme = match pb_partition_method {
552                    PartitionMethod::Hash(protobuf::HashRepartition {
553                        hash_expr: pb_hash_expr,
554                        partition_count,
555                    }) => Partitioning::Hash(
556                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
557                        *partition_count as usize,
558                    ),
559                    PartitionMethod::RoundRobin(partition_count) => {
560                        Partitioning::RoundRobinBatch(*partition_count as usize)
561                    }
562                };
563
564                LogicalPlanBuilder::from(input)
565                    .repartition(partitioning_scheme)?
566                    .build()
567            }
568            LogicalPlanType::EmptyRelation(empty_relation) => {
569                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
570            }
571            LogicalPlanType::CreateExternalTable(create_extern_table) => {
572                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
573                    DataFusionError::Internal(String::from(
574                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
575                    ))
576                })?;
577
578                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
579                    DataFusionError::Internal(String::from(
580                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
581                    ))
582                })?;
583                let definition = if !create_extern_table.definition.is_empty() {
584                    Some(create_extern_table.definition.clone())
585                } else {
586                    None
587                };
588
589                let file_type = create_extern_table.file_type.as_str();
590                if ctx.table_factory(file_type).is_none() {
591                    internal_err!("No TableProviderFactory for file type: {file_type}")?
592                }
593
594                let mut order_exprs = vec![];
595                for expr in &create_extern_table.order_exprs {
596                    order_exprs.push(from_proto::parse_sorts(
597                        &expr.sort_expr_nodes,
598                        ctx,
599                        extension_codec,
600                    )?);
601                }
602
603                let mut column_defaults =
604                    HashMap::with_capacity(create_extern_table.column_defaults.len());
605                for (col_name, expr) in &create_extern_table.column_defaults {
606                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
607                    column_defaults.insert(col_name.clone(), expr);
608                }
609
610                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
611                    CreateExternalTable {
612                        schema: pb_schema.try_into()?,
613                        name: from_table_reference(
614                            create_extern_table.name.as_ref(),
615                            "CreateExternalTable",
616                        )?,
617                        location: create_extern_table.location.clone(),
618                        file_type: create_extern_table.file_type.clone(),
619                        table_partition_cols: create_extern_table
620                            .table_partition_cols
621                            .clone(),
622                        order_exprs,
623                        if_not_exists: create_extern_table.if_not_exists,
624                        temporary: create_extern_table.temporary,
625                        definition,
626                        unbounded: create_extern_table.unbounded,
627                        options: create_extern_table.options.clone(),
628                        constraints: constraints.into(),
629                        column_defaults,
630                    },
631                )))
632            }
633            LogicalPlanType::CreateView(create_view) => {
634                let plan = create_view
635                    .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
636                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
637                )))?
638                    .try_into_logical_plan(ctx, extension_codec)?;
639                let definition = if !create_view.definition.is_empty() {
640                    Some(create_view.definition.clone())
641                } else {
642                    None
643                };
644
645                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
647                    temporary: create_view.temporary,
648                    input: Arc::new(plan),
649                    or_replace: create_view.or_replace,
650                    definition,
651                })))
652            }
653            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
654                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
655                    DataFusionError::Internal(String::from(
656                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
657                    ))
658                })?;
659
660                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
661                    CreateCatalogSchema {
662                        schema_name: create_catalog_schema.schema_name.clone(),
663                        if_not_exists: create_catalog_schema.if_not_exists,
664                        schema: pb_schema.try_into()?,
665                    },
666                )))
667            }
668            LogicalPlanType::CreateCatalog(create_catalog) => {
669                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
670                    DataFusionError::Internal(String::from(
671                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
672                    ))
673                })?;
674
675                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676                    CreateCatalog {
677                        catalog_name: create_catalog.catalog_name.clone(),
678                        if_not_exists: create_catalog.if_not_exists,
679                        schema: pb_schema.try_into()?,
680                    },
681                )))
682            }
683            LogicalPlanType::Analyze(analyze) => {
684                let input: LogicalPlan =
685                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
686                LogicalPlanBuilder::from(input)
687                    .explain(analyze.verbose, true)?
688                    .build()
689            }
690            LogicalPlanType::Explain(explain) => {
691                let input: LogicalPlan =
692                    into_logical_plan!(explain.input, ctx, extension_codec)?;
693                LogicalPlanBuilder::from(input)
694                    .explain(explain.verbose, false)?
695                    .build()
696            }
697            LogicalPlanType::SubqueryAlias(aliased_relation) => {
698                let input: LogicalPlan =
699                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
700                let alias = from_table_reference(
701                    aliased_relation.alias.as_ref(),
702                    "SubqueryAlias",
703                )?;
704                LogicalPlanBuilder::from(input).alias(alias)?.build()
705            }
706            LogicalPlanType::Limit(limit) => {
707                let input: LogicalPlan =
708                    into_logical_plan!(limit.input, ctx, extension_codec)?;
709                let skip = limit.skip.max(0) as usize;
710
711                let fetch = if limit.fetch < 0 {
712                    None
713                } else {
714                    Some(limit.fetch as usize)
715                };
716
717                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
718            }
719            LogicalPlanType::Join(join) => {
720                let left_keys: Vec<Expr> =
721                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
722                let right_keys: Vec<Expr> =
723                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
724                let join_type =
725                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
726                        proto_error(format!(
727                            "Received a JoinNode message with unknown JoinType {}",
728                            join.join_type
729                        ))
730                    })?;
731                let join_constraint = protobuf::JoinConstraint::try_from(
732                    join.join_constraint,
733                )
734                .map_err(|_| {
735                    proto_error(format!(
736                        "Received a JoinNode message with unknown JoinConstraint {}",
737                        join.join_constraint
738                    ))
739                })?;
740                let filter: Option<Expr> = join
741                    .filter
742                    .as_ref()
743                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
744                    .map_or(Ok(None), |v| v.map(Some))?;
745
746                let builder = LogicalPlanBuilder::from(into_logical_plan!(
747                    join.left,
748                    ctx,
749                    extension_codec
750                )?);
751                let builder = match join_constraint.into() {
752                    JoinConstraint::On => builder.join_with_expr_keys(
753                        into_logical_plan!(join.right, ctx, extension_codec)?,
754                        join_type.into(),
755                        (left_keys, right_keys),
756                        filter,
757                    )?,
758                    JoinConstraint::Using => {
759                        // The equijoin keys in using-join must be column.
760                        let using_keys = left_keys
761                            .into_iter()
762                            .map(|key| {
763                                key.try_as_col().cloned()
764                                    .ok_or_else(|| internal_datafusion_err!(
765                                        "Using join keys must be column references, got: {key:?}"
766                                    ))
767                            })
768                            .collect::<Result<Vec<_>, _>>()?;
769                        builder.join_using(
770                            into_logical_plan!(join.right, ctx, extension_codec)?,
771                            join_type.into(),
772                            using_keys,
773                        )?
774                    }
775                };
776
777                builder.build()
778            }
779            LogicalPlanType::Union(union) => {
780                if union.inputs.len() < 2 {
781                    return  Err( DataFusionError::Internal(String::from(
782                        "Protobuf deserialization error, Union was require at least two input.",
783                    )));
784                }
785                let (first, rest) = union.inputs.split_first().unwrap();
786                let mut builder = LogicalPlanBuilder::from(
787                    first.try_into_logical_plan(ctx, extension_codec)?,
788                );
789
790                for i in rest {
791                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
792                    builder = builder.union(plan)?;
793                }
794                builder.build()
795            }
796            LogicalPlanType::CrossJoin(crossjoin) => {
797                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
798                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
799
800                LogicalPlanBuilder::from(left).cross_join(right)?.build()
801            }
802            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
803                let input_plans: Vec<LogicalPlan> = inputs
804                    .iter()
805                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
806                    .collect::<Result<_>>()?;
807
808                let extension_node =
809                    extension_codec.try_decode(node, &input_plans, ctx)?;
810                Ok(LogicalPlan::Extension(extension_node))
811            }
812            LogicalPlanType::Distinct(distinct) => {
813                let input: LogicalPlan =
814                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
815                LogicalPlanBuilder::from(input).distinct()?.build()
816            }
817            LogicalPlanType::DistinctOn(distinct_on) => {
818                let input: LogicalPlan =
819                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
820                let on_expr =
821                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
822                let select_expr = from_proto::parse_exprs(
823                    &distinct_on.select_expr,
824                    ctx,
825                    extension_codec,
826                )?;
827                let sort_expr = match distinct_on.sort_expr.len() {
828                    0 => None,
829                    _ => Some(from_proto::parse_sorts(
830                        &distinct_on.sort_expr,
831                        ctx,
832                        extension_codec,
833                    )?),
834                };
835                LogicalPlanBuilder::from(input)
836                    .distinct_on(on_expr, select_expr, sort_expr)?
837                    .build()
838            }
839            LogicalPlanType::ViewScan(scan) => {
840                let schema: Schema = convert_required!(scan.schema)?;
841
842                let mut projection = None;
843                if let Some(columns) = &scan.projection {
844                    let column_indices = columns
845                        .columns
846                        .iter()
847                        .map(|name| schema.index_of(name))
848                        .collect::<Result<Vec<usize>, _>>()?;
849                    projection = Some(column_indices);
850                }
851
852                let input: LogicalPlan =
853                    into_logical_plan!(scan.input, ctx, extension_codec)?;
854
855                let definition = if !scan.definition.is_empty() {
856                    Some(scan.definition.clone())
857                } else {
858                    None
859                };
860
861                let provider = ViewTable::try_new(input, definition)?;
862
863                let table_name =
864                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
865
866                LogicalPlanBuilder::scan(
867                    table_name,
868                    provider_as_source(Arc::new(provider)),
869                    projection,
870                )?
871                .build()
872            }
873            LogicalPlanType::Prepare(prepare) => {
874                let input: LogicalPlan =
875                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
876                let data_types: Vec<DataType> = prepare
877                    .data_types
878                    .iter()
879                    .map(DataType::try_from)
880                    .collect::<Result<_, _>>()?;
881                LogicalPlanBuilder::from(input)
882                    .prepare(prepare.name.clone(), data_types)?
883                    .build()
884            }
885            LogicalPlanType::DropView(dropview) => {
886                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
887                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
888                    if_exists: dropview.if_exists,
889                    schema: Arc::new(convert_required!(dropview.schema)?),
890                })))
891            }
892            LogicalPlanType::CopyTo(copy) => {
893                let input: LogicalPlan =
894                    into_logical_plan!(copy.input, ctx, extension_codec)?;
895
896                let file_type: Arc<dyn FileType> = format_as_file_type(
897                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
898                );
899
900                Ok(LogicalPlan::Copy(dml::CopyTo {
901                    input: Arc::new(input),
902                    output_url: copy.output_url.clone(),
903                    partition_by: copy.partition_by.clone(),
904                    file_type,
905                    options: Default::default(),
906                }))
907            }
908            LogicalPlanType::Unnest(unnest) => {
909                let input: LogicalPlan =
910                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
911                Ok(LogicalPlan::Unnest(Unnest {
912                    input: Arc::new(input),
913                    exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
914                    list_type_columns: unnest
915                        .list_type_columns
916                        .iter()
917                        .map(|c| {
918                            let recursion_item = c.recursion.as_ref().unwrap();
919                            (
920                                c.input_index as _,
921                                ColumnUnnestList {
922                                    output_column: recursion_item
923                                        .output_column
924                                        .as_ref()
925                                        .unwrap()
926                                        .into(),
927                                    depth: recursion_item.depth as _,
928                                },
929                            )
930                        })
931                        .collect(),
932                    struct_type_columns: unnest
933                        .struct_type_columns
934                        .iter()
935                        .map(|c| *c as usize)
936                        .collect(),
937                    dependency_indices: unnest
938                        .dependency_indices
939                        .iter()
940                        .map(|c| *c as usize)
941                        .collect(),
942                    schema: Arc::new(convert_required!(unnest.schema)?),
943                    options: into_required!(unnest.options)?,
944                }))
945            }
946            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
947                let static_term = recursive_query_node
948                    .static_term
949                    .as_ref()
950                    .ok_or_else(|| DataFusionError::Internal(String::from(
951                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
952                    )))?
953                    .try_into_logical_plan(ctx, extension_codec)?;
954
955                let recursive_term = recursive_query_node
956                    .recursive_term
957                    .as_ref()
958                    .ok_or_else(|| DataFusionError::Internal(String::from(
959                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
960                    )))?
961                    .try_into_logical_plan(ctx, extension_codec)?;
962
963                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
964                    name: recursive_query_node.name.clone(),
965                    static_term: Arc::new(static_term),
966                    recursive_term: Arc::new(recursive_term),
967                    is_distinct: recursive_query_node.is_distinct,
968                }))
969            }
970            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
971                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
972                let schema = convert_required!(*schema)?;
973                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
974                LogicalPlanBuilder::scan(
975                    name.as_str(),
976                    provider_as_source(Arc::new(cte_work_table)),
977                    None,
978                )?
979                .build()
980            }
981            LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
982                datafusion::logical_expr::DmlStatement::new(
983                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
984                    to_table_source(&dml_node.target, ctx, extension_codec)?,
985                    dml_node.dml_type().into(),
986                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
987                ),
988            )),
989        }
990    }
991
992    fn try_from_logical_plan(
993        plan: &LogicalPlan,
994        extension_codec: &dyn LogicalExtensionCodec,
995    ) -> Result<Self>
996    where
997        Self: Sized,
998    {
999        match plan {
1000            LogicalPlan::Values(Values { values, .. }) => {
1001                let n_cols = if values.is_empty() {
1002                    0
1003                } else {
1004                    values[0].len()
1005                } as u64;
1006                let values_list =
1007                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1008                Ok(LogicalPlanNode {
1009                    logical_plan_type: Some(LogicalPlanType::Values(
1010                        protobuf::ValuesNode {
1011                            n_cols,
1012                            values_list,
1013                        },
1014                    )),
1015                })
1016            }
1017            LogicalPlan::TableScan(TableScan {
1018                table_name,
1019                source,
1020                filters,
1021                projection,
1022                ..
1023            }) => {
1024                let provider = source_as_provider(source)?;
1025                let schema = provider.schema();
1026                let source = provider.as_any();
1027
1028                let projection = match projection {
1029                    None => None,
1030                    Some(columns) => {
1031                        let column_names = columns
1032                            .iter()
1033                            .map(|i| schema.field(*i).name().to_owned())
1034                            .collect();
1035                        Some(protobuf::ProjectionColumns {
1036                            columns: column_names,
1037                        })
1038                    }
1039                };
1040                let schema: protobuf::Schema = schema.as_ref().try_into()?;
1041
1042                let filters: Vec<protobuf::LogicalExprNode> =
1043                    serialize_exprs(filters, extension_codec)?;
1044
1045                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1046                    let any = listing_table.options().format.as_any();
1047                    let file_format_type = {
1048                        let mut maybe_some_type = None;
1049
1050                        #[cfg(feature = "parquet")]
1051                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1052                            let options = parquet.options();
1053                            maybe_some_type =
1054                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1055                                    options: Some(options.try_into()?),
1056                                }));
1057                        };
1058
1059                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1060                            let options = csv.options();
1061                            maybe_some_type =
1062                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1063                                    options: Some(options.try_into()?),
1064                                }));
1065                        }
1066
1067                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1068                            let options = json.options();
1069                            maybe_some_type =
1070                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1071                                    options: Some(options.try_into()?),
1072                                }))
1073                        }
1074
1075                        if any.is::<AvroFormat>() {
1076                            maybe_some_type =
1077                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1078                        }
1079
1080                        if let Some(file_format_type) = maybe_some_type {
1081                            file_format_type
1082                        } else {
1083                            return Err(proto_error(format!(
1084                            "Error converting file format, {:?} is invalid as a datafusion format.",
1085                            listing_table.options().format
1086                        )));
1087                        }
1088                    };
1089
1090                    let options = listing_table.options();
1091
1092                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1093                    for order in &options.file_sort_order {
1094                        let expr_vec = SortExprNodeCollection {
1095                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1096                        };
1097                        exprs_vec.push(expr_vec);
1098                    }
1099
1100                    Ok(LogicalPlanNode {
1101                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1102                            protobuf::ListingTableScanNode {
1103                                file_format_type: Some(file_format_type),
1104                                table_name: Some(table_name.clone().into()),
1105                                collect_stat: options.collect_stat,
1106                                file_extension: options.file_extension.clone(),
1107                                table_partition_cols: options
1108                                    .table_partition_cols
1109                                    .iter()
1110                                    .map(|x| x.0.clone())
1111                                    .collect::<Vec<_>>(),
1112                                paths: listing_table
1113                                    .table_paths()
1114                                    .iter()
1115                                    .map(|x| x.to_string())
1116                                    .collect(),
1117                                schema: Some(schema),
1118                                projection,
1119                                filters,
1120                                target_partitions: options.target_partitions as u32,
1121                                file_sort_order: exprs_vec,
1122                            },
1123                        )),
1124                    })
1125                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1126                    Ok(LogicalPlanNode {
1127                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1128                            protobuf::ViewTableScanNode {
1129                                table_name: Some(table_name.clone().into()),
1130                                input: Some(Box::new(
1131                                    LogicalPlanNode::try_from_logical_plan(
1132                                        view_table.logical_plan(),
1133                                        extension_codec,
1134                                    )?,
1135                                )),
1136                                schema: Some(schema),
1137                                projection,
1138                                definition: view_table
1139                                    .definition()
1140                                    .map(|s| s.to_string())
1141                                    .unwrap_or_default(),
1142                            },
1143                        ))),
1144                    })
1145                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1146                {
1147                    let name = cte_work_table.name().to_string();
1148                    let schema = cte_work_table.schema();
1149                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1150
1151                    Ok(LogicalPlanNode {
1152                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1153                            protobuf::CteWorkTableScanNode {
1154                                name,
1155                                schema: Some(schema),
1156                            },
1157                        )),
1158                    })
1159                } else {
1160                    let mut bytes = vec![];
1161                    extension_codec
1162                        .try_encode_table_provider(table_name, provider, &mut bytes)
1163                        .map_err(|e| context!("Error serializing custom table", e))?;
1164                    let scan = CustomScan(CustomTableScanNode {
1165                        table_name: Some(table_name.clone().into()),
1166                        projection,
1167                        schema: Some(schema),
1168                        filters,
1169                        custom_table_data: bytes,
1170                    });
1171                    let node = LogicalPlanNode {
1172                        logical_plan_type: Some(scan),
1173                    };
1174                    Ok(node)
1175                }
1176            }
1177            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1178                Ok(LogicalPlanNode {
1179                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1180                        protobuf::ProjectionNode {
1181                            input: Some(Box::new(
1182                                LogicalPlanNode::try_from_logical_plan(
1183                                    input.as_ref(),
1184                                    extension_codec,
1185                                )?,
1186                            )),
1187                            expr: serialize_exprs(expr, extension_codec)?,
1188                            optional_alias: None,
1189                        },
1190                    ))),
1191                })
1192            }
1193            LogicalPlan::Filter(filter) => {
1194                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1195                    filter.input.as_ref(),
1196                    extension_codec,
1197                )?;
1198                Ok(LogicalPlanNode {
1199                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1200                        protobuf::SelectionNode {
1201                            input: Some(Box::new(input)),
1202                            expr: Some(serialize_expr(
1203                                &filter.predicate,
1204                                extension_codec,
1205                            )?),
1206                        },
1207                    ))),
1208                })
1209            }
1210            LogicalPlan::Distinct(Distinct::All(input)) => {
1211                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1212                    input.as_ref(),
1213                    extension_codec,
1214                )?;
1215                Ok(LogicalPlanNode {
1216                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1217                        protobuf::DistinctNode {
1218                            input: Some(Box::new(input)),
1219                        },
1220                    ))),
1221                })
1222            }
1223            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1224                on_expr,
1225                select_expr,
1226                sort_expr,
1227                input,
1228                ..
1229            })) => {
1230                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1231                    input.as_ref(),
1232                    extension_codec,
1233                )?;
1234                let sort_expr = match sort_expr {
1235                    None => vec![],
1236                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1237                };
1238                Ok(LogicalPlanNode {
1239                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1240                        protobuf::DistinctOnNode {
1241                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1242                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1243                            sort_expr,
1244                            input: Some(Box::new(input)),
1245                        },
1246                    ))),
1247                })
1248            }
1249            LogicalPlan::Window(Window {
1250                input, window_expr, ..
1251            }) => {
1252                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1253                    input.as_ref(),
1254                    extension_codec,
1255                )?;
1256                Ok(LogicalPlanNode {
1257                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1258                        protobuf::WindowNode {
1259                            input: Some(Box::new(input)),
1260                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1261                        },
1262                    ))),
1263                })
1264            }
1265            LogicalPlan::Aggregate(Aggregate {
1266                group_expr,
1267                aggr_expr,
1268                input,
1269                ..
1270            }) => {
1271                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1272                    input.as_ref(),
1273                    extension_codec,
1274                )?;
1275                Ok(LogicalPlanNode {
1276                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1277                        protobuf::AggregateNode {
1278                            input: Some(Box::new(input)),
1279                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1280                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1281                        },
1282                    ))),
1283                })
1284            }
1285            LogicalPlan::Join(Join {
1286                left,
1287                right,
1288                on,
1289                filter,
1290                join_type,
1291                join_constraint,
1292                null_equals_null,
1293                ..
1294            }) => {
1295                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1296                    left.as_ref(),
1297                    extension_codec,
1298                )?;
1299                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1300                    right.as_ref(),
1301                    extension_codec,
1302                )?;
1303                let (left_join_key, right_join_key) = on
1304                    .iter()
1305                    .map(|(l, r)| {
1306                        Ok((
1307                            serialize_expr(l, extension_codec)?,
1308                            serialize_expr(r, extension_codec)?,
1309                        ))
1310                    })
1311                    .collect::<Result<Vec<_>, ToProtoError>>()?
1312                    .into_iter()
1313                    .unzip();
1314                let join_type: protobuf::JoinType = join_type.to_owned().into();
1315                let join_constraint: protobuf::JoinConstraint =
1316                    join_constraint.to_owned().into();
1317                let filter = filter
1318                    .as_ref()
1319                    .map(|e| serialize_expr(e, extension_codec))
1320                    .map_or(Ok(None), |v| v.map(Some))?;
1321                Ok(LogicalPlanNode {
1322                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1323                        protobuf::JoinNode {
1324                            left: Some(Box::new(left)),
1325                            right: Some(Box::new(right)),
1326                            join_type: join_type.into(),
1327                            join_constraint: join_constraint.into(),
1328                            left_join_key,
1329                            right_join_key,
1330                            null_equals_null: *null_equals_null,
1331                            filter,
1332                        },
1333                    ))),
1334                })
1335            }
1336            LogicalPlan::Subquery(_) => {
1337                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1338            }
1339            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1340                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1341                    input.as_ref(),
1342                    extension_codec,
1343                )?;
1344                Ok(LogicalPlanNode {
1345                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1346                        protobuf::SubqueryAliasNode {
1347                            input: Some(Box::new(input)),
1348                            alias: Some((*alias).clone().into()),
1349                        },
1350                    ))),
1351                })
1352            }
1353            LogicalPlan::Limit(limit) => {
1354                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1355                    limit.input.as_ref(),
1356                    extension_codec,
1357                )?;
1358                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1359                    return Err(proto_error(
1360                        "LogicalPlan::Limit only supports literal skip values",
1361                    ));
1362                };
1363                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1364                    return Err(proto_error(
1365                        "LogicalPlan::Limit only supports literal fetch values",
1366                    ));
1367                };
1368
1369                Ok(LogicalPlanNode {
1370                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1371                        protobuf::LimitNode {
1372                            input: Some(Box::new(input)),
1373                            skip: skip as i64,
1374                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1375                        },
1376                    ))),
1377                })
1378            }
1379            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1380                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1381                    input.as_ref(),
1382                    extension_codec,
1383                )?;
1384                let sort_expr: Vec<protobuf::SortExprNode> =
1385                    serialize_sorts(expr, extension_codec)?;
1386                Ok(LogicalPlanNode {
1387                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1388                        protobuf::SortNode {
1389                            input: Some(Box::new(input)),
1390                            expr: sort_expr,
1391                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1392                        },
1393                    ))),
1394                })
1395            }
1396            LogicalPlan::Repartition(Repartition {
1397                input,
1398                partitioning_scheme,
1399            }) => {
1400                use datafusion::logical_expr::Partitioning;
1401                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1402                    input.as_ref(),
1403                    extension_codec,
1404                )?;
1405
1406                // Assumed common usize field was batch size
1407                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
1408                use protobuf::repartition_node::PartitionMethod;
1409
1410                let pb_partition_method = match partitioning_scheme {
1411                    Partitioning::Hash(exprs, partition_count) => {
1412                        PartitionMethod::Hash(protobuf::HashRepartition {
1413                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1414                            partition_count: *partition_count as u64,
1415                        })
1416                    }
1417                    Partitioning::RoundRobinBatch(partition_count) => {
1418                        PartitionMethod::RoundRobin(*partition_count as u64)
1419                    }
1420                    Partitioning::DistributeBy(_) => {
1421                        return not_impl_err!("DistributeBy")
1422                    }
1423                };
1424
1425                Ok(LogicalPlanNode {
1426                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1427                        protobuf::RepartitionNode {
1428                            input: Some(Box::new(input)),
1429                            partition_method: Some(pb_partition_method),
1430                        },
1431                    ))),
1432                })
1433            }
1434            LogicalPlan::EmptyRelation(EmptyRelation {
1435                produce_one_row, ..
1436            }) => Ok(LogicalPlanNode {
1437                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1438                    protobuf::EmptyRelationNode {
1439                        produce_one_row: *produce_one_row,
1440                    },
1441                )),
1442            }),
1443            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1444                CreateExternalTable {
1445                    name,
1446                    location,
1447                    file_type,
1448                    schema: df_schema,
1449                    table_partition_cols,
1450                    if_not_exists,
1451                    definition,
1452                    order_exprs,
1453                    unbounded,
1454                    options,
1455                    constraints,
1456                    column_defaults,
1457                    temporary,
1458                },
1459            )) => {
1460                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1461                for order in order_exprs {
1462                    let temp = SortExprNodeCollection {
1463                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1464                    };
1465                    converted_order_exprs.push(temp);
1466                }
1467
1468                let mut converted_column_defaults =
1469                    HashMap::with_capacity(column_defaults.len());
1470                for (col_name, expr) in column_defaults {
1471                    converted_column_defaults
1472                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1473                }
1474
1475                Ok(LogicalPlanNode {
1476                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1477                        protobuf::CreateExternalTableNode {
1478                            name: Some(name.clone().into()),
1479                            location: location.clone(),
1480                            file_type: file_type.clone(),
1481                            schema: Some(df_schema.try_into()?),
1482                            table_partition_cols: table_partition_cols.clone(),
1483                            if_not_exists: *if_not_exists,
1484                            temporary: *temporary,
1485                            order_exprs: converted_order_exprs,
1486                            definition: definition.clone().unwrap_or_default(),
1487                            unbounded: *unbounded,
1488                            options: options.clone(),
1489                            constraints: Some(constraints.clone().into()),
1490                            column_defaults: converted_column_defaults,
1491                        },
1492                    )),
1493                })
1494            }
1495            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1496                name,
1497                input,
1498                or_replace,
1499                definition,
1500                temporary,
1501            })) => Ok(LogicalPlanNode {
1502                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1503                    protobuf::CreateViewNode {
1504                        name: Some(name.clone().into()),
1505                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1506                            input,
1507                            extension_codec,
1508                        )?)),
1509                        or_replace: *or_replace,
1510                        temporary: *temporary,
1511                        definition: definition.clone().unwrap_or_default(),
1512                    },
1513                ))),
1514            }),
1515            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1516                CreateCatalogSchema {
1517                    schema_name,
1518                    if_not_exists,
1519                    schema: df_schema,
1520                },
1521            )) => Ok(LogicalPlanNode {
1522                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1523                    protobuf::CreateCatalogSchemaNode {
1524                        schema_name: schema_name.clone(),
1525                        if_not_exists: *if_not_exists,
1526                        schema: Some(df_schema.try_into()?),
1527                    },
1528                )),
1529            }),
1530            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1531                catalog_name,
1532                if_not_exists,
1533                schema: df_schema,
1534            })) => Ok(LogicalPlanNode {
1535                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1536                    protobuf::CreateCatalogNode {
1537                        catalog_name: catalog_name.clone(),
1538                        if_not_exists: *if_not_exists,
1539                        schema: Some(df_schema.try_into()?),
1540                    },
1541                )),
1542            }),
1543            LogicalPlan::Analyze(a) => {
1544                let input = LogicalPlanNode::try_from_logical_plan(
1545                    a.input.as_ref(),
1546                    extension_codec,
1547                )?;
1548                Ok(LogicalPlanNode {
1549                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1550                        protobuf::AnalyzeNode {
1551                            input: Some(Box::new(input)),
1552                            verbose: a.verbose,
1553                        },
1554                    ))),
1555                })
1556            }
1557            LogicalPlan::Explain(a) => {
1558                let input = LogicalPlanNode::try_from_logical_plan(
1559                    a.plan.as_ref(),
1560                    extension_codec,
1561                )?;
1562                Ok(LogicalPlanNode {
1563                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1564                        protobuf::ExplainNode {
1565                            input: Some(Box::new(input)),
1566                            verbose: a.verbose,
1567                        },
1568                    ))),
1569                })
1570            }
1571            LogicalPlan::Union(union) => {
1572                let inputs: Vec<LogicalPlanNode> = union
1573                    .inputs
1574                    .iter()
1575                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1576                    .collect::<Result<_>>()?;
1577                Ok(LogicalPlanNode {
1578                    logical_plan_type: Some(LogicalPlanType::Union(
1579                        protobuf::UnionNode { inputs },
1580                    )),
1581                })
1582            }
1583            LogicalPlan::Extension(extension) => {
1584                let mut buf: Vec<u8> = vec![];
1585                extension_codec.try_encode(extension, &mut buf)?;
1586
1587                let inputs: Vec<LogicalPlanNode> = extension
1588                    .node
1589                    .inputs()
1590                    .iter()
1591                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1592                    .collect::<Result<_>>()?;
1593
1594                Ok(LogicalPlanNode {
1595                    logical_plan_type: Some(LogicalPlanType::Extension(
1596                        LogicalExtensionNode { node: buf, inputs },
1597                    )),
1598                })
1599            }
1600            LogicalPlan::Statement(Statement::Prepare(Prepare {
1601                name,
1602                data_types,
1603                input,
1604            })) => {
1605                let input =
1606                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1607                Ok(LogicalPlanNode {
1608                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1609                        protobuf::PrepareNode {
1610                            name: name.clone(),
1611                            data_types: data_types
1612                                .iter()
1613                                .map(|t| t.try_into())
1614                                .collect::<Result<Vec<_>, _>>()?,
1615                            input: Some(Box::new(input)),
1616                        },
1617                    ))),
1618                })
1619            }
1620            LogicalPlan::Unnest(Unnest {
1621                input,
1622                exec_columns,
1623                list_type_columns,
1624                struct_type_columns,
1625                dependency_indices,
1626                schema,
1627                options,
1628            }) => {
1629                let input =
1630                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1631                let proto_unnest_list_items = list_type_columns
1632                    .iter()
1633                    .map(|(index, ul)| ColumnUnnestListItem {
1634                        input_index: *index as _,
1635                        recursion: Some(ColumnUnnestListRecursion {
1636                            output_column: Some(ul.output_column.to_owned().into()),
1637                            depth: ul.depth as _,
1638                        }),
1639                    })
1640                    .collect();
1641                Ok(LogicalPlanNode {
1642                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1643                        protobuf::UnnestNode {
1644                            input: Some(Box::new(input)),
1645                            exec_columns: exec_columns
1646                                .iter()
1647                                .map(|col| col.into())
1648                                .collect(),
1649                            list_type_columns: proto_unnest_list_items,
1650                            struct_type_columns: struct_type_columns
1651                                .iter()
1652                                .map(|c| *c as u64)
1653                                .collect(),
1654                            dependency_indices: dependency_indices
1655                                .iter()
1656                                .map(|c| *c as u64)
1657                                .collect(),
1658                            schema: Some(schema.try_into()?),
1659                            options: Some(options.into()),
1660                        },
1661                    ))),
1662                })
1663            }
1664            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1665                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1666            )),
1667            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1668                "LogicalPlan serde is not yet implemented for CreateIndex",
1669            )),
1670            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1671                "LogicalPlan serde is not yet implemented for DropTable",
1672            )),
1673            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1674                name,
1675                if_exists,
1676                schema,
1677            })) => Ok(LogicalPlanNode {
1678                logical_plan_type: Some(LogicalPlanType::DropView(
1679                    protobuf::DropViewNode {
1680                        name: Some(name.clone().into()),
1681                        if_exists: *if_exists,
1682                        schema: Some(schema.try_into()?),
1683                    },
1684                )),
1685            }),
1686            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1687                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1688            )),
1689            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1690                "LogicalPlan serde is not yet implemented for CreateFunction",
1691            )),
1692            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1693                "LogicalPlan serde is not yet implemented for DropFunction",
1694            )),
1695            LogicalPlan::Statement(_) => Err(proto_error(
1696                "LogicalPlan serde is not yet implemented for Statement",
1697            )),
1698            LogicalPlan::Dml(DmlStatement {
1699                table_name,
1700                target,
1701                op,
1702                input,
1703                ..
1704            }) => {
1705                let input =
1706                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1707                let dml_type: dml_node::Type = op.into();
1708                Ok(LogicalPlanNode {
1709                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1710                        input: Some(Box::new(input)),
1711                        target: Some(Box::new(from_table_source(
1712                            table_name.clone(),
1713                            Arc::clone(target),
1714                            extension_codec,
1715                        )?)),
1716                        table_name: Some(table_name.clone().into()),
1717                        dml_type: dml_type.into(),
1718                    }))),
1719                })
1720            }
1721            LogicalPlan::Copy(dml::CopyTo {
1722                input,
1723                output_url,
1724                file_type,
1725                partition_by,
1726                ..
1727            }) => {
1728                let input =
1729                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1730                let mut buf = Vec::new();
1731                extension_codec
1732                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1733
1734                Ok(LogicalPlanNode {
1735                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1736                        protobuf::CopyToNode {
1737                            input: Some(Box::new(input)),
1738                            output_url: output_url.to_string(),
1739                            file_type: buf,
1740                            partition_by: partition_by.clone(),
1741                        },
1742                    ))),
1743                })
1744            }
1745            LogicalPlan::DescribeTable(_) => Err(proto_error(
1746                "LogicalPlan serde is not yet implemented for DescribeTable",
1747            )),
1748            LogicalPlan::RecursiveQuery(recursive) => {
1749                let static_term = LogicalPlanNode::try_from_logical_plan(
1750                    recursive.static_term.as_ref(),
1751                    extension_codec,
1752                )?;
1753                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1754                    recursive.recursive_term.as_ref(),
1755                    extension_codec,
1756                )?;
1757
1758                Ok(LogicalPlanNode {
1759                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1760                        protobuf::RecursiveQueryNode {
1761                            name: recursive.name.clone(),
1762                            static_term: Some(Box::new(static_term)),
1763                            recursive_term: Some(Box::new(recursive_term)),
1764                            is_distinct: recursive.is_distinct,
1765                        },
1766                    ))),
1767                })
1768            }
1769        }
1770    }
1771}