datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, listing_table_scan_node::FileFormatType,
31        logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32    },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43    file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46    datasource::{
47        file_format::{
48            csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49        },
50        listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51        view::ViewTable,
52        TableProvider,
53    },
54    datasource::{provider_as_source, source_as_provider},
55    prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59    context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60    DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63    dml,
64    logical_plan::{
65        builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66        CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67        Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68        SubqueryAlias, TableScan, Values, Window,
69    },
70    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71    Statement, WindowUDF,
72};
73use datafusion_expr::{
74    AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
75};
76
77use self::to_proto::{serialize_expr, serialize_exprs};
78use crate::logical_plan::to_proto::serialize_sorts;
79use prost::bytes::BufMut;
80use prost::Message;
81
82pub mod file_formats;
83pub mod from_proto;
84pub mod to_proto;
85
86pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
87    fn try_decode(buf: &[u8]) -> Result<Self>
88    where
89        Self: Sized;
90
91    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
92    where
93        B: BufMut,
94        Self: Sized;
95
96    fn try_into_logical_plan(
97        &self,
98        ctx: &SessionContext,
99        extension_codec: &dyn LogicalExtensionCodec,
100    ) -> Result<LogicalPlan>;
101
102    fn try_from_logical_plan(
103        plan: &LogicalPlan,
104        extension_codec: &dyn LogicalExtensionCodec,
105    ) -> Result<Self>
106    where
107        Self: Sized;
108}
109
110pub trait LogicalExtensionCodec: Debug + Send + Sync {
111    fn try_decode(
112        &self,
113        buf: &[u8],
114        inputs: &[LogicalPlan],
115        ctx: &SessionContext,
116    ) -> Result<Extension>;
117
118    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
119
120    fn try_decode_table_provider(
121        &self,
122        buf: &[u8],
123        table_ref: &TableReference,
124        schema: SchemaRef,
125        ctx: &SessionContext,
126    ) -> Result<Arc<dyn TableProvider>>;
127
128    fn try_encode_table_provider(
129        &self,
130        table_ref: &TableReference,
131        node: Arc<dyn TableProvider>,
132        buf: &mut Vec<u8>,
133    ) -> Result<()>;
134
135    fn try_decode_file_format(
136        &self,
137        _buf: &[u8],
138        _ctx: &SessionContext,
139    ) -> Result<Arc<dyn FileFormatFactory>> {
140        not_impl_err!("LogicalExtensionCodec is not provided for file format")
141    }
142
143    fn try_encode_file_format(
144        &self,
145        _buf: &mut Vec<u8>,
146        _node: Arc<dyn FileFormatFactory>,
147    ) -> Result<()> {
148        Ok(())
149    }
150
151    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
152        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
153    }
154
155    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
156        Ok(())
157    }
158
159    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
160        not_impl_err!(
161            "LogicalExtensionCodec is not provided for aggregate function {name}"
162        )
163    }
164
165    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
166        Ok(())
167    }
168
169    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
170        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
171    }
172
173    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
174        Ok(())
175    }
176}
177
178#[derive(Debug, Clone)]
179pub struct DefaultLogicalExtensionCodec {}
180
181impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
182    fn try_decode(
183        &self,
184        _buf: &[u8],
185        _inputs: &[LogicalPlan],
186        _ctx: &SessionContext,
187    ) -> Result<Extension> {
188        not_impl_err!("LogicalExtensionCodec is not provided")
189    }
190
191    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
192        not_impl_err!("LogicalExtensionCodec is not provided")
193    }
194
195    fn try_decode_table_provider(
196        &self,
197        _buf: &[u8],
198        _table_ref: &TableReference,
199        _schema: SchemaRef,
200        _ctx: &SessionContext,
201    ) -> Result<Arc<dyn TableProvider>> {
202        not_impl_err!("LogicalExtensionCodec is not provided")
203    }
204
205    fn try_encode_table_provider(
206        &self,
207        _table_ref: &TableReference,
208        _node: Arc<dyn TableProvider>,
209        _buf: &mut Vec<u8>,
210    ) -> Result<()> {
211        not_impl_err!("LogicalExtensionCodec is not provided")
212    }
213}
214
215#[macro_export]
216macro_rules! into_logical_plan {
217    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
218        if let Some(field) = $PB.as_ref() {
219            field.as_ref().try_into_logical_plan($CTX, $CODEC)
220        } else {
221            Err(proto_error("Missing required field in protobuf"))
222        }
223    }};
224}
225
226fn from_table_reference(
227    table_ref: Option<&protobuf::TableReference>,
228    error_context: &str,
229) -> Result<TableReference> {
230    let table_ref = table_ref.ok_or_else(|| {
231        DataFusionError::Internal(format!(
232            "Protobuf deserialization error, {error_context} was missing required field name."
233        ))
234    })?;
235
236    Ok(table_ref.clone().try_into()?)
237}
238
239/// Converts [LogicalPlan::TableScan] to [TableSource]
240/// method to be used to deserialize nodes
241/// serialized by [from_table_source]
242fn to_table_source(
243    node: &Option<Box<LogicalPlanNode>>,
244    ctx: &SessionContext,
245    extension_codec: &dyn LogicalExtensionCodec,
246) -> Result<Arc<dyn TableSource>> {
247    if let Some(node) = node {
248        match node.try_into_logical_plan(ctx, extension_codec)? {
249            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
250            _ => plan_err!("expected TableScan node"),
251        }
252    } else {
253        plan_err!("LogicalPlanNode should be provided")
254    }
255}
256
257/// converts [TableSource] to [LogicalPlan::TableScan]
258/// using [LogicalPlan::TableScan] was the best approach to
259/// serialize [TableSource] to [LogicalPlan::TableScan]
260fn from_table_source(
261    table_name: TableReference,
262    target: Arc<dyn TableSource>,
263    extension_codec: &dyn LogicalExtensionCodec,
264) -> Result<LogicalPlanNode> {
265    let projected_schema = target.schema().to_dfschema_ref()?;
266    let r = LogicalPlan::TableScan(TableScan {
267        table_name,
268        source: target,
269        projection: None,
270        projected_schema,
271        filters: vec![],
272        fetch: None,
273    });
274
275    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
276}
277
278impl AsLogicalPlan for LogicalPlanNode {
279    fn try_decode(buf: &[u8]) -> Result<Self>
280    where
281        Self: Sized,
282    {
283        LogicalPlanNode::decode(buf).map_err(|e| {
284            DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
285        })
286    }
287
288    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
289    where
290        B: BufMut,
291        Self: Sized,
292    {
293        self.encode(buf).map_err(|e| {
294            DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
295        })
296    }
297
298    fn try_into_logical_plan(
299        &self,
300        ctx: &SessionContext,
301        extension_codec: &dyn LogicalExtensionCodec,
302    ) -> Result<LogicalPlan> {
303        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
304            proto_error(format!(
305                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
306            ))
307        })?;
308        match plan {
309            LogicalPlanType::Values(values) => {
310                let n_cols = values.n_cols as usize;
311                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
312                    Ok(Vec::new())
313                } else if values.values_list.len() % n_cols != 0 {
314                    internal_err!(
315                        "Invalid values list length, expect {} to be divisible by {}",
316                        values.values_list.len(),
317                        n_cols
318                    )
319                } else {
320                    values
321                        .values_list
322                        .chunks_exact(n_cols)
323                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
324                        .collect::<Result<Vec<_>, _>>()
325                        .map_err(|e| e.into())
326                }?;
327
328                LogicalPlanBuilder::values(values)?.build()
329            }
330            LogicalPlanType::Projection(projection) => {
331                let input: LogicalPlan =
332                    into_logical_plan!(projection.input, ctx, extension_codec)?;
333                let expr: Vec<Expr> =
334                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
335
336                let new_proj = project(input, expr)?;
337                match projection.optional_alias.as_ref() {
338                    Some(a) => match a {
339                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
340                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
341                                Arc::new(new_proj),
342                                alias.clone(),
343                            )?))
344                        }
345                    },
346                    _ => Ok(new_proj),
347                }
348            }
349            LogicalPlanType::Selection(selection) => {
350                let input: LogicalPlan =
351                    into_logical_plan!(selection.input, ctx, extension_codec)?;
352                let expr: Expr = selection
353                    .expr
354                    .as_ref()
355                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
356                    .transpose()?
357                    .ok_or_else(|| proto_error("expression required"))?;
358                LogicalPlanBuilder::from(input).filter(expr)?.build()
359            }
360            LogicalPlanType::Window(window) => {
361                let input: LogicalPlan =
362                    into_logical_plan!(window.input, ctx, extension_codec)?;
363                let window_expr =
364                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
365                LogicalPlanBuilder::from(input).window(window_expr)?.build()
366            }
367            LogicalPlanType::Aggregate(aggregate) => {
368                let input: LogicalPlan =
369                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
370                let group_expr =
371                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
372                let aggr_expr =
373                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
374                LogicalPlanBuilder::from(input)
375                    .aggregate(group_expr, aggr_expr)?
376                    .build()
377            }
378            LogicalPlanType::ListingScan(scan) => {
379                let schema: Schema = convert_required!(scan.schema)?;
380
381                let mut projection = None;
382                if let Some(columns) = &scan.projection {
383                    let column_indices = columns
384                        .columns
385                        .iter()
386                        .map(|name| schema.index_of(name))
387                        .collect::<Result<Vec<usize>, _>>()?;
388                    projection = Some(column_indices);
389                }
390
391                let filters =
392                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
393
394                let mut all_sort_orders = vec![];
395                for order in &scan.file_sort_order {
396                    all_sort_orders.push(from_proto::parse_sorts(
397                        &order.sort_expr_nodes,
398                        ctx,
399                        extension_codec,
400                    )?)
401                }
402
403                let file_format: Arc<dyn FileFormat> =
404                    match scan.file_format_type.as_ref().ok_or_else(|| {
405                        proto_error(format!(
406                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
407                        ))
408                    })? {
409                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
410                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
411                            #[cfg(feature = "parquet")]
412                            {
413                                let mut parquet = ParquetFormat::default();
414                                if let Some(options) = options {
415                                    parquet = parquet.with_options(options.try_into()?)
416                                }
417                                Arc::new(parquet)
418                            }
419                            #[cfg(not(feature = "parquet"))]
420                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
421                        }
422                        FileFormatType::Csv(protobuf::CsvFormat {
423                            options
424                        }) => {
425                            let mut csv = CsvFormat::default();
426                            if let Some(options) = options {
427                                csv = csv.with_options(options.try_into()?)
428                            }
429                            Arc::new(csv)
430                        },
431                        FileFormatType::Json(protobuf::NdJsonFormat {
432                            options
433                        }) => {
434                            let mut json = OtherNdJsonFormat::default();
435                            if let Some(options) = options {
436                                json = json.with_options(options.try_into()?)
437                            }
438                            Arc::new(json)
439                        }
440                        #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
441                        FileFormatType::Avro(..) => {
442                            #[cfg(feature = "avro")] 
443                            {
444                                Arc::new(AvroFormat)
445                            }
446                            #[cfg(not(feature = "avro"))]
447                            panic!("Unable to process avro file since `avro` feature is not enabled");
448                        }
449                    };
450
451                let table_paths = &scan
452                    .paths
453                    .iter()
454                    .map(ListingTableUrl::parse)
455                    .collect::<Result<Vec<_>, _>>()?;
456
457                let partition_columns = scan
458                    .table_partition_cols
459                    .iter()
460                    .map(|col| {
461                        let Some(arrow_type) = col.arrow_type.as_ref() else {
462                            return Err(proto_error(
463                                "Missing Arrow type in partition columns",
464                            ));
465                        };
466                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
467                            proto_error(format!("Received an unknown ArrowType: {e}"))
468                        })?;
469                        Ok((col.name.clone(), arrow_type))
470                    })
471                    .collect::<Result<Vec<_>>>()?;
472
473                let options = ListingOptions::new(file_format)
474                    .with_file_extension(&scan.file_extension)
475                    .with_table_partition_cols(partition_columns)
476                    .with_collect_stat(scan.collect_stat)
477                    .with_target_partitions(scan.target_partitions as usize)
478                    .with_file_sort_order(all_sort_orders);
479
480                let config =
481                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
482                        .with_listing_options(options)
483                        .with_schema(Arc::new(schema));
484
485                let provider = ListingTable::try_new(config)?.with_cache(
486                    ctx.state()
487                        .runtime_env()
488                        .cache_manager
489                        .get_file_statistic_cache(),
490                );
491
492                let table_name =
493                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
494
495                LogicalPlanBuilder::scan_with_filters(
496                    table_name,
497                    provider_as_source(Arc::new(provider)),
498                    projection,
499                    filters,
500                )?
501                .build()
502            }
503            LogicalPlanType::CustomScan(scan) => {
504                let schema: Schema = convert_required!(scan.schema)?;
505                let schema = Arc::new(schema);
506                let mut projection = None;
507                if let Some(columns) = &scan.projection {
508                    let column_indices = columns
509                        .columns
510                        .iter()
511                        .map(|name| schema.index_of(name))
512                        .collect::<Result<Vec<usize>, _>>()?;
513                    projection = Some(column_indices);
514                }
515
516                let filters =
517                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
518
519                let table_name =
520                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
521
522                let provider = extension_codec.try_decode_table_provider(
523                    &scan.custom_table_data,
524                    &table_name,
525                    schema,
526                    ctx,
527                )?;
528
529                LogicalPlanBuilder::scan_with_filters(
530                    table_name,
531                    provider_as_source(provider),
532                    projection,
533                    filters,
534                )?
535                .build()
536            }
537            LogicalPlanType::Sort(sort) => {
538                let input: LogicalPlan =
539                    into_logical_plan!(sort.input, ctx, extension_codec)?;
540                let sort_expr: Vec<SortExpr> =
541                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
542                let fetch: Option<usize> = sort.fetch.try_into().ok();
543                LogicalPlanBuilder::from(input)
544                    .sort_with_limit(sort_expr, fetch)?
545                    .build()
546            }
547            LogicalPlanType::Repartition(repartition) => {
548                use datafusion::logical_expr::Partitioning;
549                let input: LogicalPlan =
550                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
551                use protobuf::repartition_node::PartitionMethod;
552                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
553                    internal_datafusion_err!(
554                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
555                    )
556                })?;
557
558                let partitioning_scheme = match pb_partition_method {
559                    PartitionMethod::Hash(protobuf::HashRepartition {
560                        hash_expr: pb_hash_expr,
561                        partition_count,
562                    }) => Partitioning::Hash(
563                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
564                        *partition_count as usize,
565                    ),
566                    PartitionMethod::RoundRobin(partition_count) => {
567                        Partitioning::RoundRobinBatch(*partition_count as usize)
568                    }
569                };
570
571                LogicalPlanBuilder::from(input)
572                    .repartition(partitioning_scheme)?
573                    .build()
574            }
575            LogicalPlanType::EmptyRelation(empty_relation) => {
576                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
577            }
578            LogicalPlanType::CreateExternalTable(create_extern_table) => {
579                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
580                    DataFusionError::Internal(String::from(
581                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
582                    ))
583                })?;
584
585                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
586                    DataFusionError::Internal(String::from(
587                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
588                    ))
589                })?;
590                let definition = if !create_extern_table.definition.is_empty() {
591                    Some(create_extern_table.definition.clone())
592                } else {
593                    None
594                };
595
596                let file_type = create_extern_table.file_type.as_str();
597                if ctx.table_factory(file_type).is_none() {
598                    internal_err!("No TableProviderFactory for file type: {file_type}")?
599                }
600
601                let mut order_exprs = vec![];
602                for expr in &create_extern_table.order_exprs {
603                    order_exprs.push(from_proto::parse_sorts(
604                        &expr.sort_expr_nodes,
605                        ctx,
606                        extension_codec,
607                    )?);
608                }
609
610                let mut column_defaults =
611                    HashMap::with_capacity(create_extern_table.column_defaults.len());
612                for (col_name, expr) in &create_extern_table.column_defaults {
613                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
614                    column_defaults.insert(col_name.clone(), expr);
615                }
616
617                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
618                    CreateExternalTable {
619                        schema: pb_schema.try_into()?,
620                        name: from_table_reference(
621                            create_extern_table.name.as_ref(),
622                            "CreateExternalTable",
623                        )?,
624                        location: create_extern_table.location.clone(),
625                        file_type: create_extern_table.file_type.clone(),
626                        table_partition_cols: create_extern_table
627                            .table_partition_cols
628                            .clone(),
629                        order_exprs,
630                        if_not_exists: create_extern_table.if_not_exists,
631                        temporary: create_extern_table.temporary,
632                        definition,
633                        unbounded: create_extern_table.unbounded,
634                        options: create_extern_table.options.clone(),
635                        constraints: constraints.into(),
636                        column_defaults,
637                    },
638                )))
639            }
640            LogicalPlanType::CreateView(create_view) => {
641                let plan = create_view
642                    .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
643                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
644                )))?
645                    .try_into_logical_plan(ctx, extension_codec)?;
646                let definition = if !create_view.definition.is_empty() {
647                    Some(create_view.definition.clone())
648                } else {
649                    None
650                };
651
652                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
653                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
654                    temporary: create_view.temporary,
655                    input: Arc::new(plan),
656                    or_replace: create_view.or_replace,
657                    definition,
658                })))
659            }
660            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
661                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
662                    DataFusionError::Internal(String::from(
663                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
664                    ))
665                })?;
666
667                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
668                    CreateCatalogSchema {
669                        schema_name: create_catalog_schema.schema_name.clone(),
670                        if_not_exists: create_catalog_schema.if_not_exists,
671                        schema: pb_schema.try_into()?,
672                    },
673                )))
674            }
675            LogicalPlanType::CreateCatalog(create_catalog) => {
676                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
677                    DataFusionError::Internal(String::from(
678                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
679                    ))
680                })?;
681
682                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
683                    CreateCatalog {
684                        catalog_name: create_catalog.catalog_name.clone(),
685                        if_not_exists: create_catalog.if_not_exists,
686                        schema: pb_schema.try_into()?,
687                    },
688                )))
689            }
690            LogicalPlanType::Analyze(analyze) => {
691                let input: LogicalPlan =
692                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
693                LogicalPlanBuilder::from(input)
694                    .explain(analyze.verbose, true)?
695                    .build()
696            }
697            LogicalPlanType::Explain(explain) => {
698                let input: LogicalPlan =
699                    into_logical_plan!(explain.input, ctx, extension_codec)?;
700                LogicalPlanBuilder::from(input)
701                    .explain(explain.verbose, false)?
702                    .build()
703            }
704            LogicalPlanType::SubqueryAlias(aliased_relation) => {
705                let input: LogicalPlan =
706                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
707                let alias = from_table_reference(
708                    aliased_relation.alias.as_ref(),
709                    "SubqueryAlias",
710                )?;
711                LogicalPlanBuilder::from(input).alias(alias)?.build()
712            }
713            LogicalPlanType::Limit(limit) => {
714                let input: LogicalPlan =
715                    into_logical_plan!(limit.input, ctx, extension_codec)?;
716                let skip = limit.skip.max(0) as usize;
717
718                let fetch = if limit.fetch < 0 {
719                    None
720                } else {
721                    Some(limit.fetch as usize)
722                };
723
724                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
725            }
726            LogicalPlanType::Join(join) => {
727                let left_keys: Vec<Expr> =
728                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
729                let right_keys: Vec<Expr> =
730                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
731                let join_type =
732                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
733                        proto_error(format!(
734                            "Received a JoinNode message with unknown JoinType {}",
735                            join.join_type
736                        ))
737                    })?;
738                let join_constraint = protobuf::JoinConstraint::try_from(
739                    join.join_constraint,
740                )
741                .map_err(|_| {
742                    proto_error(format!(
743                        "Received a JoinNode message with unknown JoinConstraint {}",
744                        join.join_constraint
745                    ))
746                })?;
747                let filter: Option<Expr> = join
748                    .filter
749                    .as_ref()
750                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
751                    .map_or(Ok(None), |v| v.map(Some))?;
752
753                let builder = LogicalPlanBuilder::from(into_logical_plan!(
754                    join.left,
755                    ctx,
756                    extension_codec
757                )?);
758                let builder = match join_constraint.into() {
759                    JoinConstraint::On => builder.join_with_expr_keys(
760                        into_logical_plan!(join.right, ctx, extension_codec)?,
761                        join_type.into(),
762                        (left_keys, right_keys),
763                        filter,
764                    )?,
765                    JoinConstraint::Using => {
766                        // The equijoin keys in using-join must be column.
767                        let using_keys = left_keys
768                            .into_iter()
769                            .map(|key| {
770                                key.try_as_col().cloned()
771                                    .ok_or_else(|| internal_datafusion_err!(
772                                        "Using join keys must be column references, got: {key:?}"
773                                    ))
774                            })
775                            .collect::<Result<Vec<_>, _>>()?;
776                        builder.join_using(
777                            into_logical_plan!(join.right, ctx, extension_codec)?,
778                            join_type.into(),
779                            using_keys,
780                        )?
781                    }
782                };
783
784                builder.build()
785            }
786            LogicalPlanType::Union(union) => {
787                if union.inputs.len() < 2 {
788                    return  Err( DataFusionError::Internal(String::from(
789                        "Protobuf deserialization error, Union was require at least two input.",
790                    )));
791                }
792                let (first, rest) = union.inputs.split_first().unwrap();
793                let mut builder = LogicalPlanBuilder::from(
794                    first.try_into_logical_plan(ctx, extension_codec)?,
795                );
796
797                for i in rest {
798                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
799                    builder = builder.union(plan)?;
800                }
801                builder.build()
802            }
803            LogicalPlanType::CrossJoin(crossjoin) => {
804                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
805                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
806
807                LogicalPlanBuilder::from(left).cross_join(right)?.build()
808            }
809            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
810                let input_plans: Vec<LogicalPlan> = inputs
811                    .iter()
812                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
813                    .collect::<Result<_>>()?;
814
815                let extension_node =
816                    extension_codec.try_decode(node, &input_plans, ctx)?;
817                Ok(LogicalPlan::Extension(extension_node))
818            }
819            LogicalPlanType::Distinct(distinct) => {
820                let input: LogicalPlan =
821                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
822                LogicalPlanBuilder::from(input).distinct()?.build()
823            }
824            LogicalPlanType::DistinctOn(distinct_on) => {
825                let input: LogicalPlan =
826                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
827                let on_expr =
828                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
829                let select_expr = from_proto::parse_exprs(
830                    &distinct_on.select_expr,
831                    ctx,
832                    extension_codec,
833                )?;
834                let sort_expr = match distinct_on.sort_expr.len() {
835                    0 => None,
836                    _ => Some(from_proto::parse_sorts(
837                        &distinct_on.sort_expr,
838                        ctx,
839                        extension_codec,
840                    )?),
841                };
842                LogicalPlanBuilder::from(input)
843                    .distinct_on(on_expr, select_expr, sort_expr)?
844                    .build()
845            }
846            LogicalPlanType::ViewScan(scan) => {
847                let schema: Schema = convert_required!(scan.schema)?;
848
849                let mut projection = None;
850                if let Some(columns) = &scan.projection {
851                    let column_indices = columns
852                        .columns
853                        .iter()
854                        .map(|name| schema.index_of(name))
855                        .collect::<Result<Vec<usize>, _>>()?;
856                    projection = Some(column_indices);
857                }
858
859                let input: LogicalPlan =
860                    into_logical_plan!(scan.input, ctx, extension_codec)?;
861
862                let definition = if !scan.definition.is_empty() {
863                    Some(scan.definition.clone())
864                } else {
865                    None
866                };
867
868                let provider = ViewTable::new(input, definition);
869
870                let table_name =
871                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
872
873                LogicalPlanBuilder::scan(
874                    table_name,
875                    provider_as_source(Arc::new(provider)),
876                    projection,
877                )?
878                .build()
879            }
880            LogicalPlanType::Prepare(prepare) => {
881                let input: LogicalPlan =
882                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
883                let data_types: Vec<DataType> = prepare
884                    .data_types
885                    .iter()
886                    .map(DataType::try_from)
887                    .collect::<Result<_, _>>()?;
888                LogicalPlanBuilder::from(input)
889                    .prepare(prepare.name.clone(), data_types)?
890                    .build()
891            }
892            LogicalPlanType::DropView(dropview) => {
893                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
894                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
895                    if_exists: dropview.if_exists,
896                    schema: Arc::new(convert_required!(dropview.schema)?),
897                })))
898            }
899            LogicalPlanType::CopyTo(copy) => {
900                let input: LogicalPlan =
901                    into_logical_plan!(copy.input, ctx, extension_codec)?;
902
903                let file_type: Arc<dyn FileType> = format_as_file_type(
904                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
905                );
906
907                Ok(LogicalPlan::Copy(dml::CopyTo::new(
908                    Arc::new(input),
909                    copy.output_url.clone(),
910                    copy.partition_by.clone(),
911                    file_type,
912                    Default::default(),
913                )))
914            }
915            LogicalPlanType::Unnest(unnest) => {
916                let input: LogicalPlan =
917                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
918
919                LogicalPlanBuilder::from(input)
920                    .unnest_columns_with_options(
921                        unnest.exec_columns.iter().map(|c| c.into()).collect(),
922                        into_required!(unnest.options)?,
923                    )?
924                    .build()
925            }
926            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
927                let static_term = recursive_query_node
928                    .static_term
929                    .as_ref()
930                    .ok_or_else(|| DataFusionError::Internal(String::from(
931                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
932                    )))?
933                    .try_into_logical_plan(ctx, extension_codec)?;
934
935                let recursive_term = recursive_query_node
936                    .recursive_term
937                    .as_ref()
938                    .ok_or_else(|| DataFusionError::Internal(String::from(
939                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
940                    )))?
941                    .try_into_logical_plan(ctx, extension_codec)?;
942
943                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
944                    name: recursive_query_node.name.clone(),
945                    static_term: Arc::new(static_term),
946                    recursive_term: Arc::new(recursive_term),
947                    is_distinct: recursive_query_node.is_distinct,
948                }))
949            }
950            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
951                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
952                let schema = convert_required!(*schema)?;
953                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
954                LogicalPlanBuilder::scan(
955                    name.as_str(),
956                    provider_as_source(Arc::new(cte_work_table)),
957                    None,
958                )?
959                .build()
960            }
961            LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
962                datafusion::logical_expr::DmlStatement::new(
963                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
964                    to_table_source(&dml_node.target, ctx, extension_codec)?,
965                    dml_node.dml_type().into(),
966                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
967                ),
968            )),
969        }
970    }
971
972    fn try_from_logical_plan(
973        plan: &LogicalPlan,
974        extension_codec: &dyn LogicalExtensionCodec,
975    ) -> Result<Self>
976    where
977        Self: Sized,
978    {
979        match plan {
980            LogicalPlan::Values(Values { values, .. }) => {
981                let n_cols = if values.is_empty() {
982                    0
983                } else {
984                    values[0].len()
985                } as u64;
986                let values_list =
987                    serialize_exprs(values.iter().flatten(), extension_codec)?;
988                Ok(LogicalPlanNode {
989                    logical_plan_type: Some(LogicalPlanType::Values(
990                        protobuf::ValuesNode {
991                            n_cols,
992                            values_list,
993                        },
994                    )),
995                })
996            }
997            LogicalPlan::TableScan(TableScan {
998                table_name,
999                source,
1000                filters,
1001                projection,
1002                ..
1003            }) => {
1004                let provider = source_as_provider(source)?;
1005                let schema = provider.schema();
1006                let source = provider.as_any();
1007
1008                let projection = match projection {
1009                    None => None,
1010                    Some(columns) => {
1011                        let column_names = columns
1012                            .iter()
1013                            .map(|i| schema.field(*i).name().to_owned())
1014                            .collect();
1015                        Some(protobuf::ProjectionColumns {
1016                            columns: column_names,
1017                        })
1018                    }
1019                };
1020
1021                let filters: Vec<protobuf::LogicalExprNode> =
1022                    serialize_exprs(filters, extension_codec)?;
1023
1024                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1025                    let any = listing_table.options().format.as_any();
1026                    let file_format_type = {
1027                        let mut maybe_some_type = None;
1028
1029                        #[cfg(feature = "parquet")]
1030                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1031                            let options = parquet.options();
1032                            maybe_some_type =
1033                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1034                                    options: Some(options.try_into()?),
1035                                }));
1036                        };
1037
1038                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1039                            let options = csv.options();
1040                            maybe_some_type =
1041                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1042                                    options: Some(options.try_into()?),
1043                                }));
1044                        }
1045
1046                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1047                            let options = json.options();
1048                            maybe_some_type =
1049                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1050                                    options: Some(options.try_into()?),
1051                                }))
1052                        }
1053
1054                        #[cfg(feature = "avro")]
1055                        if any.is::<AvroFormat>() {
1056                            maybe_some_type =
1057                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1058                        }
1059
1060                        if let Some(file_format_type) = maybe_some_type {
1061                            file_format_type
1062                        } else {
1063                            return Err(proto_error(format!(
1064                            "Error converting file format, {:?} is invalid as a datafusion format.",
1065                            listing_table.options().format
1066                        )));
1067                        }
1068                    };
1069
1070                    let options = listing_table.options();
1071
1072                    let mut builder = SchemaBuilder::from(schema.as_ref());
1073                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1074                        if options
1075                            .table_partition_cols
1076                            .iter()
1077                            .any(|(name, _)| name == field.name())
1078                        {
1079                            builder.remove(idx);
1080                        }
1081                    }
1082
1083                    let schema = builder.finish();
1084
1085                    let schema: protobuf::Schema = (&schema).try_into()?;
1086
1087                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1088                    for order in &options.file_sort_order {
1089                        let expr_vec = SortExprNodeCollection {
1090                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1091                        };
1092                        exprs_vec.push(expr_vec);
1093                    }
1094
1095                    let partition_columns = options
1096                        .table_partition_cols
1097                        .iter()
1098                        .map(|(name, arrow_type)| {
1099                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1100                                .map_err(|e| {
1101                                    proto_error(format!(
1102                                        "Received an unknown ArrowType: {e}"
1103                                    ))
1104                                })?;
1105                            Ok(protobuf::PartitionColumn {
1106                                name: name.clone(),
1107                                arrow_type: Some(arrow_type),
1108                            })
1109                        })
1110                        .collect::<Result<Vec<_>>>()?;
1111
1112                    Ok(LogicalPlanNode {
1113                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1114                            protobuf::ListingTableScanNode {
1115                                file_format_type: Some(file_format_type),
1116                                table_name: Some(table_name.clone().into()),
1117                                collect_stat: options.collect_stat,
1118                                file_extension: options.file_extension.clone(),
1119                                table_partition_cols: partition_columns,
1120                                paths: listing_table
1121                                    .table_paths()
1122                                    .iter()
1123                                    .map(|x| x.to_string())
1124                                    .collect(),
1125                                schema: Some(schema),
1126                                projection,
1127                                filters,
1128                                target_partitions: options.target_partitions as u32,
1129                                file_sort_order: exprs_vec,
1130                            },
1131                        )),
1132                    })
1133                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1134                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1135                    Ok(LogicalPlanNode {
1136                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1137                            protobuf::ViewTableScanNode {
1138                                table_name: Some(table_name.clone().into()),
1139                                input: Some(Box::new(
1140                                    LogicalPlanNode::try_from_logical_plan(
1141                                        view_table.logical_plan(),
1142                                        extension_codec,
1143                                    )?,
1144                                )),
1145                                schema: Some(schema),
1146                                projection,
1147                                definition: view_table
1148                                    .definition()
1149                                    .map(|s| s.to_string())
1150                                    .unwrap_or_default(),
1151                            },
1152                        ))),
1153                    })
1154                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1155                {
1156                    let name = cte_work_table.name().to_string();
1157                    let schema = cte_work_table.schema();
1158                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1159
1160                    Ok(LogicalPlanNode {
1161                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1162                            protobuf::CteWorkTableScanNode {
1163                                name,
1164                                schema: Some(schema),
1165                            },
1166                        )),
1167                    })
1168                } else {
1169                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1170                    let mut bytes = vec![];
1171                    extension_codec
1172                        .try_encode_table_provider(table_name, provider, &mut bytes)
1173                        .map_err(|e| context!("Error serializing custom table", e))?;
1174                    let scan = CustomScan(CustomTableScanNode {
1175                        table_name: Some(table_name.clone().into()),
1176                        projection,
1177                        schema: Some(schema),
1178                        filters,
1179                        custom_table_data: bytes,
1180                    });
1181                    let node = LogicalPlanNode {
1182                        logical_plan_type: Some(scan),
1183                    };
1184                    Ok(node)
1185                }
1186            }
1187            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1188                Ok(LogicalPlanNode {
1189                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1190                        protobuf::ProjectionNode {
1191                            input: Some(Box::new(
1192                                LogicalPlanNode::try_from_logical_plan(
1193                                    input.as_ref(),
1194                                    extension_codec,
1195                                )?,
1196                            )),
1197                            expr: serialize_exprs(expr, extension_codec)?,
1198                            optional_alias: None,
1199                        },
1200                    ))),
1201                })
1202            }
1203            LogicalPlan::Filter(filter) => {
1204                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1205                    filter.input.as_ref(),
1206                    extension_codec,
1207                )?;
1208                Ok(LogicalPlanNode {
1209                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1210                        protobuf::SelectionNode {
1211                            input: Some(Box::new(input)),
1212                            expr: Some(serialize_expr(
1213                                &filter.predicate,
1214                                extension_codec,
1215                            )?),
1216                        },
1217                    ))),
1218                })
1219            }
1220            LogicalPlan::Distinct(Distinct::All(input)) => {
1221                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1222                    input.as_ref(),
1223                    extension_codec,
1224                )?;
1225                Ok(LogicalPlanNode {
1226                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1227                        protobuf::DistinctNode {
1228                            input: Some(Box::new(input)),
1229                        },
1230                    ))),
1231                })
1232            }
1233            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1234                on_expr,
1235                select_expr,
1236                sort_expr,
1237                input,
1238                ..
1239            })) => {
1240                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1241                    input.as_ref(),
1242                    extension_codec,
1243                )?;
1244                let sort_expr = match sort_expr {
1245                    None => vec![],
1246                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1247                };
1248                Ok(LogicalPlanNode {
1249                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1250                        protobuf::DistinctOnNode {
1251                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1252                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1253                            sort_expr,
1254                            input: Some(Box::new(input)),
1255                        },
1256                    ))),
1257                })
1258            }
1259            LogicalPlan::Window(Window {
1260                input, window_expr, ..
1261            }) => {
1262                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1263                    input.as_ref(),
1264                    extension_codec,
1265                )?;
1266                Ok(LogicalPlanNode {
1267                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1268                        protobuf::WindowNode {
1269                            input: Some(Box::new(input)),
1270                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1271                        },
1272                    ))),
1273                })
1274            }
1275            LogicalPlan::Aggregate(Aggregate {
1276                group_expr,
1277                aggr_expr,
1278                input,
1279                ..
1280            }) => {
1281                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1282                    input.as_ref(),
1283                    extension_codec,
1284                )?;
1285                Ok(LogicalPlanNode {
1286                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1287                        protobuf::AggregateNode {
1288                            input: Some(Box::new(input)),
1289                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1290                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1291                        },
1292                    ))),
1293                })
1294            }
1295            LogicalPlan::Join(Join {
1296                left,
1297                right,
1298                on,
1299                filter,
1300                join_type,
1301                join_constraint,
1302                null_equality,
1303                ..
1304            }) => {
1305                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1306                    left.as_ref(),
1307                    extension_codec,
1308                )?;
1309                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310                    right.as_ref(),
1311                    extension_codec,
1312                )?;
1313                let (left_join_key, right_join_key) = on
1314                    .iter()
1315                    .map(|(l, r)| {
1316                        Ok((
1317                            serialize_expr(l, extension_codec)?,
1318                            serialize_expr(r, extension_codec)?,
1319                        ))
1320                    })
1321                    .collect::<Result<Vec<_>, ToProtoError>>()?
1322                    .into_iter()
1323                    .unzip();
1324                let join_type: protobuf::JoinType = join_type.to_owned().into();
1325                let join_constraint: protobuf::JoinConstraint =
1326                    join_constraint.to_owned().into();
1327                let null_equality: protobuf::NullEquality =
1328                    null_equality.to_owned().into();
1329                let filter = filter
1330                    .as_ref()
1331                    .map(|e| serialize_expr(e, extension_codec))
1332                    .map_or(Ok(None), |v| v.map(Some))?;
1333                Ok(LogicalPlanNode {
1334                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1335                        protobuf::JoinNode {
1336                            left: Some(Box::new(left)),
1337                            right: Some(Box::new(right)),
1338                            join_type: join_type.into(),
1339                            join_constraint: join_constraint.into(),
1340                            left_join_key,
1341                            right_join_key,
1342                            null_equality: null_equality.into(),
1343                            filter,
1344                        },
1345                    ))),
1346                })
1347            }
1348            LogicalPlan::Subquery(_) => {
1349                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1350            }
1351            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1352                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1353                    input.as_ref(),
1354                    extension_codec,
1355                )?;
1356                Ok(LogicalPlanNode {
1357                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1358                        protobuf::SubqueryAliasNode {
1359                            input: Some(Box::new(input)),
1360                            alias: Some((*alias).clone().into()),
1361                        },
1362                    ))),
1363                })
1364            }
1365            LogicalPlan::Limit(limit) => {
1366                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1367                    limit.input.as_ref(),
1368                    extension_codec,
1369                )?;
1370                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1371                    return Err(proto_error(
1372                        "LogicalPlan::Limit only supports literal skip values",
1373                    ));
1374                };
1375                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1376                    return Err(proto_error(
1377                        "LogicalPlan::Limit only supports literal fetch values",
1378                    ));
1379                };
1380
1381                Ok(LogicalPlanNode {
1382                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1383                        protobuf::LimitNode {
1384                            input: Some(Box::new(input)),
1385                            skip: skip as i64,
1386                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1387                        },
1388                    ))),
1389                })
1390            }
1391            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1392                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1393                    input.as_ref(),
1394                    extension_codec,
1395                )?;
1396                let sort_expr: Vec<protobuf::SortExprNode> =
1397                    serialize_sorts(expr, extension_codec)?;
1398                Ok(LogicalPlanNode {
1399                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1400                        protobuf::SortNode {
1401                            input: Some(Box::new(input)),
1402                            expr: sort_expr,
1403                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1404                        },
1405                    ))),
1406                })
1407            }
1408            LogicalPlan::Repartition(Repartition {
1409                input,
1410                partitioning_scheme,
1411            }) => {
1412                use datafusion::logical_expr::Partitioning;
1413                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414                    input.as_ref(),
1415                    extension_codec,
1416                )?;
1417
1418                // Assumed common usize field was batch size
1419                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
1420                use protobuf::repartition_node::PartitionMethod;
1421
1422                let pb_partition_method = match partitioning_scheme {
1423                    Partitioning::Hash(exprs, partition_count) => {
1424                        PartitionMethod::Hash(protobuf::HashRepartition {
1425                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1426                            partition_count: *partition_count as u64,
1427                        })
1428                    }
1429                    Partitioning::RoundRobinBatch(partition_count) => {
1430                        PartitionMethod::RoundRobin(*partition_count as u64)
1431                    }
1432                    Partitioning::DistributeBy(_) => {
1433                        return not_impl_err!("DistributeBy")
1434                    }
1435                };
1436
1437                Ok(LogicalPlanNode {
1438                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1439                        protobuf::RepartitionNode {
1440                            input: Some(Box::new(input)),
1441                            partition_method: Some(pb_partition_method),
1442                        },
1443                    ))),
1444                })
1445            }
1446            LogicalPlan::EmptyRelation(EmptyRelation {
1447                produce_one_row, ..
1448            }) => Ok(LogicalPlanNode {
1449                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1450                    protobuf::EmptyRelationNode {
1451                        produce_one_row: *produce_one_row,
1452                    },
1453                )),
1454            }),
1455            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1456                CreateExternalTable {
1457                    name,
1458                    location,
1459                    file_type,
1460                    schema: df_schema,
1461                    table_partition_cols,
1462                    if_not_exists,
1463                    definition,
1464                    order_exprs,
1465                    unbounded,
1466                    options,
1467                    constraints,
1468                    column_defaults,
1469                    temporary,
1470                },
1471            )) => {
1472                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1473                for order in order_exprs {
1474                    let temp = SortExprNodeCollection {
1475                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1476                    };
1477                    converted_order_exprs.push(temp);
1478                }
1479
1480                let mut converted_column_defaults =
1481                    HashMap::with_capacity(column_defaults.len());
1482                for (col_name, expr) in column_defaults {
1483                    converted_column_defaults
1484                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1485                }
1486
1487                Ok(LogicalPlanNode {
1488                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1489                        protobuf::CreateExternalTableNode {
1490                            name: Some(name.clone().into()),
1491                            location: location.clone(),
1492                            file_type: file_type.clone(),
1493                            schema: Some(df_schema.try_into()?),
1494                            table_partition_cols: table_partition_cols.clone(),
1495                            if_not_exists: *if_not_exists,
1496                            temporary: *temporary,
1497                            order_exprs: converted_order_exprs,
1498                            definition: definition.clone().unwrap_or_default(),
1499                            unbounded: *unbounded,
1500                            options: options.clone(),
1501                            constraints: Some(constraints.clone().into()),
1502                            column_defaults: converted_column_defaults,
1503                        },
1504                    )),
1505                })
1506            }
1507            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1508                name,
1509                input,
1510                or_replace,
1511                definition,
1512                temporary,
1513            })) => Ok(LogicalPlanNode {
1514                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1515                    protobuf::CreateViewNode {
1516                        name: Some(name.clone().into()),
1517                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1518                            input,
1519                            extension_codec,
1520                        )?)),
1521                        or_replace: *or_replace,
1522                        temporary: *temporary,
1523                        definition: definition.clone().unwrap_or_default(),
1524                    },
1525                ))),
1526            }),
1527            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1528                CreateCatalogSchema {
1529                    schema_name,
1530                    if_not_exists,
1531                    schema: df_schema,
1532                },
1533            )) => Ok(LogicalPlanNode {
1534                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1535                    protobuf::CreateCatalogSchemaNode {
1536                        schema_name: schema_name.clone(),
1537                        if_not_exists: *if_not_exists,
1538                        schema: Some(df_schema.try_into()?),
1539                    },
1540                )),
1541            }),
1542            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1543                catalog_name,
1544                if_not_exists,
1545                schema: df_schema,
1546            })) => Ok(LogicalPlanNode {
1547                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1548                    protobuf::CreateCatalogNode {
1549                        catalog_name: catalog_name.clone(),
1550                        if_not_exists: *if_not_exists,
1551                        schema: Some(df_schema.try_into()?),
1552                    },
1553                )),
1554            }),
1555            LogicalPlan::Analyze(a) => {
1556                let input = LogicalPlanNode::try_from_logical_plan(
1557                    a.input.as_ref(),
1558                    extension_codec,
1559                )?;
1560                Ok(LogicalPlanNode {
1561                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1562                        protobuf::AnalyzeNode {
1563                            input: Some(Box::new(input)),
1564                            verbose: a.verbose,
1565                        },
1566                    ))),
1567                })
1568            }
1569            LogicalPlan::Explain(a) => {
1570                let input = LogicalPlanNode::try_from_logical_plan(
1571                    a.plan.as_ref(),
1572                    extension_codec,
1573                )?;
1574                Ok(LogicalPlanNode {
1575                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1576                        protobuf::ExplainNode {
1577                            input: Some(Box::new(input)),
1578                            verbose: a.verbose,
1579                        },
1580                    ))),
1581                })
1582            }
1583            LogicalPlan::Union(union) => {
1584                let inputs: Vec<LogicalPlanNode> = union
1585                    .inputs
1586                    .iter()
1587                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1588                    .collect::<Result<_>>()?;
1589                Ok(LogicalPlanNode {
1590                    logical_plan_type: Some(LogicalPlanType::Union(
1591                        protobuf::UnionNode { inputs },
1592                    )),
1593                })
1594            }
1595            LogicalPlan::Extension(extension) => {
1596                let mut buf: Vec<u8> = vec![];
1597                extension_codec.try_encode(extension, &mut buf)?;
1598
1599                let inputs: Vec<LogicalPlanNode> = extension
1600                    .node
1601                    .inputs()
1602                    .iter()
1603                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1604                    .collect::<Result<_>>()?;
1605
1606                Ok(LogicalPlanNode {
1607                    logical_plan_type: Some(LogicalPlanType::Extension(
1608                        LogicalExtensionNode { node: buf, inputs },
1609                    )),
1610                })
1611            }
1612            LogicalPlan::Statement(Statement::Prepare(Prepare {
1613                name,
1614                data_types,
1615                input,
1616            })) => {
1617                let input =
1618                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1619                Ok(LogicalPlanNode {
1620                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1621                        protobuf::PrepareNode {
1622                            name: name.clone(),
1623                            data_types: data_types
1624                                .iter()
1625                                .map(|t| t.try_into())
1626                                .collect::<Result<Vec<_>, _>>()?,
1627                            input: Some(Box::new(input)),
1628                        },
1629                    ))),
1630                })
1631            }
1632            LogicalPlan::Unnest(Unnest {
1633                input,
1634                exec_columns,
1635                list_type_columns,
1636                struct_type_columns,
1637                dependency_indices,
1638                schema,
1639                options,
1640            }) => {
1641                let input =
1642                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1643                let proto_unnest_list_items = list_type_columns
1644                    .iter()
1645                    .map(|(index, ul)| ColumnUnnestListItem {
1646                        input_index: *index as _,
1647                        recursion: Some(ColumnUnnestListRecursion {
1648                            output_column: Some(ul.output_column.to_owned().into()),
1649                            depth: ul.depth as _,
1650                        }),
1651                    })
1652                    .collect();
1653                Ok(LogicalPlanNode {
1654                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1655                        protobuf::UnnestNode {
1656                            input: Some(Box::new(input)),
1657                            exec_columns: exec_columns
1658                                .iter()
1659                                .map(|col| col.into())
1660                                .collect(),
1661                            list_type_columns: proto_unnest_list_items,
1662                            struct_type_columns: struct_type_columns
1663                                .iter()
1664                                .map(|c| *c as u64)
1665                                .collect(),
1666                            dependency_indices: dependency_indices
1667                                .iter()
1668                                .map(|c| *c as u64)
1669                                .collect(),
1670                            schema: Some(schema.try_into()?),
1671                            options: Some(options.into()),
1672                        },
1673                    ))),
1674                })
1675            }
1676            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1677                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1678            )),
1679            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1680                "LogicalPlan serde is not yet implemented for CreateIndex",
1681            )),
1682            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1683                "LogicalPlan serde is not yet implemented for DropTable",
1684            )),
1685            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1686                name,
1687                if_exists,
1688                schema,
1689            })) => Ok(LogicalPlanNode {
1690                logical_plan_type: Some(LogicalPlanType::DropView(
1691                    protobuf::DropViewNode {
1692                        name: Some(name.clone().into()),
1693                        if_exists: *if_exists,
1694                        schema: Some(schema.try_into()?),
1695                    },
1696                )),
1697            }),
1698            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1699                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1700            )),
1701            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1702                "LogicalPlan serde is not yet implemented for CreateFunction",
1703            )),
1704            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1705                "LogicalPlan serde is not yet implemented for DropFunction",
1706            )),
1707            LogicalPlan::Statement(_) => Err(proto_error(
1708                "LogicalPlan serde is not yet implemented for Statement",
1709            )),
1710            LogicalPlan::Dml(DmlStatement {
1711                table_name,
1712                target,
1713                op,
1714                input,
1715                ..
1716            }) => {
1717                let input =
1718                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1719                let dml_type: dml_node::Type = op.into();
1720                Ok(LogicalPlanNode {
1721                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1722                        input: Some(Box::new(input)),
1723                        target: Some(Box::new(from_table_source(
1724                            table_name.clone(),
1725                            Arc::clone(target),
1726                            extension_codec,
1727                        )?)),
1728                        table_name: Some(table_name.clone().into()),
1729                        dml_type: dml_type.into(),
1730                    }))),
1731                })
1732            }
1733            LogicalPlan::Copy(dml::CopyTo {
1734                input,
1735                output_url,
1736                file_type,
1737                partition_by,
1738                ..
1739            }) => {
1740                let input =
1741                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1742                let mut buf = Vec::new();
1743                extension_codec
1744                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1745
1746                Ok(LogicalPlanNode {
1747                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1748                        protobuf::CopyToNode {
1749                            input: Some(Box::new(input)),
1750                            output_url: output_url.to_string(),
1751                            file_type: buf,
1752                            partition_by: partition_by.clone(),
1753                        },
1754                    ))),
1755                })
1756            }
1757            LogicalPlan::DescribeTable(_) => Err(proto_error(
1758                "LogicalPlan serde is not yet implemented for DescribeTable",
1759            )),
1760            LogicalPlan::RecursiveQuery(recursive) => {
1761                let static_term = LogicalPlanNode::try_from_logical_plan(
1762                    recursive.static_term.as_ref(),
1763                    extension_codec,
1764                )?;
1765                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1766                    recursive.recursive_term.as_ref(),
1767                    extension_codec,
1768                )?;
1769
1770                Ok(LogicalPlanNode {
1771                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1772                        protobuf::RecursiveQueryNode {
1773                            name: recursive.name.clone(),
1774                            static_term: Some(Box::new(static_term)),
1775                            recursive_term: Some(Box::new(recursive_term)),
1776                            is_distinct: recursive.is_distinct,
1777                        },
1778                    ))),
1779                })
1780            }
1781        }
1782    }
1783}