Skip to main content

datafusion_proto/logical_plan/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24    ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25    CustomTableScanNode, DmlNode, SortExprNodeCollection, dml_node,
26};
27use crate::{
28    convert_required, into_required,
29    protobuf::{
30        self, LogicalExtensionNode, LogicalPlanNode,
31        listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType,
32    },
33};
34
35use crate::protobuf::{ToProtoError, proto_error};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_catalog::empty::EmptyTable;
39use datafusion_common::file_options::file_type::FileType;
40use datafusion_common::format::ExplainFormat;
41use datafusion_common::{
42    Result, TableReference, ToDFSchema, assert_or_internal_err, context,
43    internal_datafusion_err, internal_err, not_impl_err, plan_err,
44};
45use datafusion_datasource::file_format::FileFormat;
46use datafusion_datasource::file_format::{
47    FileFormatFactory, file_type_to_format, format_as_file_type,
48};
49use datafusion_datasource_arrow::file_format::{ArrowFormat, ArrowFormatFactory};
50#[cfg(feature = "avro")]
51use datafusion_datasource_avro::file_format::AvroFormat;
52use datafusion_datasource_csv::file_format::{CsvFormat, CsvFormatFactory};
53use datafusion_datasource_json::file_format::{
54    JsonFormat as OtherNdJsonFormat, JsonFormatFactory,
55};
56#[cfg(feature = "parquet")]
57use datafusion_datasource_parquet::file_format::{ParquetFormat, ParquetFormatFactory};
58use datafusion_expr::{
59    AggregateUDF, DmlStatement, FetchType, HigherOrderUDF, RecursiveQuery, SkipType,
60    TableSource, Unnest,
61};
62use datafusion_expr::{
63    DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
64    Statement, WindowUDF, dml,
65    logical_plan::{
66        Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
67        DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare,
68        Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
69        builder::project,
70    },
71};
72
73use self::to_proto::{serialize_expr, serialize_exprs};
74use crate::logical_plan::to_proto::serialize_sorts;
75use datafusion_catalog::TableProvider;
76use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
77use datafusion_catalog::view::ViewTable;
78use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
79use datafusion_datasource::ListingTableUrl;
80use datafusion_execution::TaskContext;
81use prost::Message;
82use prost::bytes::BufMut;
83
84pub mod file_formats;
85pub mod from_proto;
86pub mod to_proto;
87
88pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
89    fn try_decode(buf: &[u8]) -> Result<Self>
90    where
91        Self: Sized;
92
93    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
94    where
95        B: BufMut,
96        Self: Sized;
97
98    fn try_into_logical_plan(
99        &self,
100        ctx: &TaskContext,
101        extension_codec: &dyn LogicalExtensionCodec,
102    ) -> Result<LogicalPlan>;
103
104    fn try_from_logical_plan(
105        plan: &LogicalPlan,
106        extension_codec: &dyn LogicalExtensionCodec,
107    ) -> Result<Self>
108    where
109        Self: Sized;
110}
111
112pub trait LogicalExtensionCodec: Debug + Send + Sync + std::any::Any {
113    fn try_decode(
114        &self,
115        buf: &[u8],
116        inputs: &[LogicalPlan],
117        ctx: &TaskContext,
118    ) -> Result<Extension>;
119
120    fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
121
122    fn try_decode_table_provider(
123        &self,
124        buf: &[u8],
125        table_ref: &TableReference,
126        schema: SchemaRef,
127        ctx: &TaskContext,
128    ) -> Result<Arc<dyn TableProvider>>;
129
130    fn try_encode_table_provider(
131        &self,
132        table_ref: &TableReference,
133        node: Arc<dyn TableProvider>,
134        buf: &mut Vec<u8>,
135    ) -> Result<()>;
136
137    fn try_decode_file_format(
138        &self,
139        _buf: &[u8],
140        _ctx: &TaskContext,
141    ) -> Result<Arc<dyn FileFormatFactory>> {
142        not_impl_err!("LogicalExtensionCodec is not provided for file format")
143    }
144
145    fn try_encode_file_format(
146        &self,
147        _buf: &mut Vec<u8>,
148        _node: Arc<dyn FileFormatFactory>,
149    ) -> Result<()> {
150        Ok(())
151    }
152
153    fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
154        not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
155    }
156
157    fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
158        Ok(())
159    }
160
161    fn try_decode_higher_order_function(
162        &self,
163        name: &str,
164        _buf: &[u8],
165    ) -> Result<Arc<HigherOrderUDF>> {
166        not_impl_err!(
167            "LogicalExtensionCodec is not provided for higher order function {name}"
168        )
169    }
170
171    fn try_encode_higher_order_function(
172        &self,
173        _node: &HigherOrderUDF,
174        _buf: &mut Vec<u8>,
175    ) -> Result<()> {
176        Ok(())
177    }
178
179    fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
180        not_impl_err!(
181            "LogicalExtensionCodec is not provided for aggregate function {name}"
182        )
183    }
184
185    fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
186        Ok(())
187    }
188
189    fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
190        not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
191    }
192
193    fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
194        Ok(())
195    }
196}
197
198#[derive(Debug, Clone)]
199pub struct DefaultLogicalExtensionCodec {}
200
201impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
202    fn try_decode(
203        &self,
204        _buf: &[u8],
205        _inputs: &[LogicalPlan],
206        _ctx: &TaskContext,
207    ) -> Result<Extension> {
208        not_impl_err!("LogicalExtensionCodec is not provided")
209    }
210
211    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
212        not_impl_err!("LogicalExtensionCodec is not provided")
213    }
214
215    fn try_decode_table_provider(
216        &self,
217        _buf: &[u8],
218        _table_ref: &TableReference,
219        _schema: SchemaRef,
220        _ctx: &TaskContext,
221    ) -> Result<Arc<dyn TableProvider>> {
222        not_impl_err!("LogicalExtensionCodec is not provided")
223    }
224
225    fn try_encode_table_provider(
226        &self,
227        _table_ref: &TableReference,
228        _node: Arc<dyn TableProvider>,
229        _buf: &mut Vec<u8>,
230    ) -> Result<()> {
231        not_impl_err!("LogicalExtensionCodec is not provided")
232    }
233
234    fn try_decode_file_format(
235        &self,
236        buf: &[u8],
237        ctx: &TaskContext,
238    ) -> Result<Arc<dyn FileFormatFactory>> {
239        let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
240            internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
241        })?;
242
243        let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
244            internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
245        })?;
246
247        match kind {
248            protobuf::FileFormatKind::Csv => file_formats::CsvLogicalExtensionCodec
249                .try_decode_file_format(&proto.encoded_file_format, ctx),
250            protobuf::FileFormatKind::Json => file_formats::JsonLogicalExtensionCodec
251                .try_decode_file_format(&proto.encoded_file_format, ctx),
252            #[cfg(feature = "parquet")]
253            protobuf::FileFormatKind::Parquet => {
254                file_formats::ParquetLogicalExtensionCodec
255                    .try_decode_file_format(&proto.encoded_file_format, ctx)
256            }
257            protobuf::FileFormatKind::Arrow => file_formats::ArrowLogicalExtensionCodec
258                .try_decode_file_format(&proto.encoded_file_format, ctx),
259            protobuf::FileFormatKind::Avro => file_formats::AvroLogicalExtensionCodec
260                .try_decode_file_format(&proto.encoded_file_format, ctx),
261            #[cfg(not(feature = "parquet"))]
262            protobuf::FileFormatKind::Parquet => {
263                not_impl_err!("Parquet support requires the 'parquet' feature")
264            }
265            protobuf::FileFormatKind::Unspecified => {
266                not_impl_err!("Unspecified file format kind")
267            }
268        }
269    }
270
271    fn try_encode_file_format(
272        &self,
273        buf: &mut Vec<u8>,
274        node: Arc<dyn FileFormatFactory>,
275    ) -> Result<()> {
276        let mut encoded_file_format = Vec::new();
277
278        let kind = if node.downcast_ref::<CsvFormatFactory>().is_some() {
279            file_formats::CsvLogicalExtensionCodec
280                .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
281            protobuf::FileFormatKind::Csv
282        } else if node.downcast_ref::<JsonFormatFactory>().is_some() {
283            file_formats::JsonLogicalExtensionCodec
284                .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
285            protobuf::FileFormatKind::Json
286        } else if node.downcast_ref::<ArrowFormatFactory>().is_some() {
287            file_formats::ArrowLogicalExtensionCodec
288                .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
289            protobuf::FileFormatKind::Arrow
290        } else {
291            #[cfg(feature = "parquet")]
292            {
293                if node.downcast_ref::<ParquetFormatFactory>().is_some() {
294                    file_formats::ParquetLogicalExtensionCodec.try_encode_file_format(
295                        &mut encoded_file_format,
296                        Arc::clone(&node),
297                    )?;
298                    protobuf::FileFormatKind::Parquet
299                } else {
300                    return not_impl_err!(
301                        "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec"
302                    );
303                }
304            }
305            #[cfg(not(feature = "parquet"))]
306            {
307                return not_impl_err!(
308                    "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec"
309                );
310            }
311        };
312
313        let proto = protobuf::FileFormatProto {
314            kind: kind as i32,
315            encoded_file_format,
316        };
317        proto.encode(buf).map_err(|e| {
318            internal_datafusion_err!("Failed to encode FileFormatProto: {e}")
319        })?;
320        Ok(())
321    }
322}
323
324#[macro_export]
325macro_rules! into_logical_plan {
326    ($PB:expr, $CTX:expr, $CODEC:expr) => {{
327        if let Some(field) = $PB.as_ref() {
328            field.as_ref().try_into_logical_plan($CTX, $CODEC)
329        } else {
330            Err(proto_error("Missing required field in protobuf"))
331        }
332    }};
333}
334
335fn from_table_reference(
336    table_ref: Option<&protobuf::TableReference>,
337    error_context: &str,
338) -> Result<TableReference> {
339    let table_ref = table_ref.ok_or_else(|| {
340        internal_datafusion_err!(
341            "Protobuf deserialization error, {error_context} was missing required field name."
342        )
343    })?;
344
345    Ok(table_ref.clone().try_into()?)
346}
347
348/// Converts [LogicalPlan::TableScan] to [TableSource]
349/// method to be used to deserialize nodes
350/// serialized by [from_table_source]
351fn to_table_source(
352    node: &Option<Box<LogicalPlanNode>>,
353    ctx: &TaskContext,
354    extension_codec: &dyn LogicalExtensionCodec,
355) -> Result<Arc<dyn TableSource>> {
356    if let Some(node) = node {
357        match node.try_into_logical_plan(ctx, extension_codec)? {
358            LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
359            _ => plan_err!("expected TableScan node"),
360        }
361    } else {
362        plan_err!("LogicalPlanNode should be provided")
363    }
364}
365
366/// converts [TableSource] to [LogicalPlan::TableScan]
367/// using [LogicalPlan::TableScan] was the best approach to
368/// serialize [TableSource] to [LogicalPlan::TableScan]
369fn from_table_source(
370    table_name: TableReference,
371    target: Arc<dyn TableSource>,
372    extension_codec: &dyn LogicalExtensionCodec,
373) -> Result<LogicalPlanNode> {
374    let projected_schema = target.schema().to_dfschema_ref()?;
375    let r = LogicalPlan::TableScan(TableScan {
376        table_name,
377        source: target,
378        projection: None,
379        projected_schema,
380        filters: vec![],
381        fetch: None,
382    });
383
384    LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
385}
386
387impl AsLogicalPlan for LogicalPlanNode {
388    fn try_decode(buf: &[u8]) -> Result<Self>
389    where
390        Self: Sized,
391    {
392        LogicalPlanNode::decode(buf)
393            .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
394    }
395
396    fn try_encode<B>(&self, buf: &mut B) -> Result<()>
397    where
398        B: BufMut,
399        Self: Sized,
400    {
401        self.encode(buf)
402            .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
403    }
404
405    fn try_into_logical_plan(
406        &self,
407        ctx: &TaskContext,
408        extension_codec: &dyn LogicalExtensionCodec,
409    ) -> Result<LogicalPlan> {
410        let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
411            proto_error(format!(
412                "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
413            ))
414        })?;
415        match plan {
416            LogicalPlanType::Values(values) => {
417                let n_cols = values.n_cols as usize;
418                let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
419                    Ok(Vec::new())
420                } else if values.values_list.len() % n_cols != 0 {
421                    internal_err!(
422                        "Invalid values list length, expect {} to be divisible by {}",
423                        values.values_list.len(),
424                        n_cols
425                    )
426                } else {
427                    values
428                        .values_list
429                        .chunks_exact(n_cols)
430                        .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
431                        .collect::<Result<Vec<_>, _>>()
432                        .map_err(|e| e.into())
433                }?;
434
435                LogicalPlanBuilder::values(values)?.build()
436            }
437            LogicalPlanType::Projection(projection) => {
438                let input: LogicalPlan =
439                    into_logical_plan!(projection.input, ctx, extension_codec)?;
440                let expr: Vec<Expr> =
441                    from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
442
443                let new_proj = project(input, expr)?;
444                match projection.optional_alias.as_ref() {
445                    Some(a) => match a {
446                        protobuf::projection_node::OptionalAlias::Alias(alias) => {
447                            Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
448                                Arc::new(new_proj),
449                                alias.clone(),
450                            )?))
451                        }
452                    },
453                    _ => Ok(new_proj),
454                }
455            }
456            LogicalPlanType::Selection(selection) => {
457                let input: LogicalPlan =
458                    into_logical_plan!(selection.input, ctx, extension_codec)?;
459                let expr: Expr = selection
460                    .expr
461                    .as_ref()
462                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
463                    .transpose()?
464                    .ok_or_else(|| proto_error("expression required"))?;
465                LogicalPlanBuilder::from(input).filter(expr)?.build()
466            }
467            LogicalPlanType::Window(window) => {
468                let input: LogicalPlan =
469                    into_logical_plan!(window.input, ctx, extension_codec)?;
470                let window_expr =
471                    from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
472                LogicalPlanBuilder::from(input).window(window_expr)?.build()
473            }
474            LogicalPlanType::Aggregate(aggregate) => {
475                let input: LogicalPlan =
476                    into_logical_plan!(aggregate.input, ctx, extension_codec)?;
477                let group_expr =
478                    from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
479                let aggr_expr =
480                    from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
481                LogicalPlanBuilder::from(input)
482                    .aggregate(group_expr, aggr_expr)?
483                    .build()
484            }
485            LogicalPlanType::ListingScan(scan) => {
486                let schema: Schema = convert_required!(scan.schema)?;
487
488                let filters =
489                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
490
491                let mut all_sort_orders = vec![];
492                for order in &scan.file_sort_order {
493                    all_sort_orders.push(from_proto::parse_sorts(
494                        &order.sort_expr_nodes,
495                        ctx,
496                        extension_codec,
497                    )?)
498                }
499
500                let file_format: Arc<dyn FileFormat> =
501                    match scan.file_format_type.as_ref().ok_or_else(|| {
502                        proto_error(format!(
503                            "logical_plan::from_proto() Unsupported file format '{self:?}'"
504                        ))
505                    })? {
506                        #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
507                        FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
508                            #[cfg(feature = "parquet")]
509                            {
510                                let mut parquet = ParquetFormat::default();
511                                if let Some(options) = options {
512                                    parquet = parquet.with_options(options.try_into()?)
513                                }
514                                Arc::new(parquet)
515                            }
516                            #[cfg(not(feature = "parquet"))]
517                            panic!("Unable to process parquet file since `parquet` feature is not enabled");
518                        }
519                        FileFormatType::Csv(protobuf::CsvFormat {
520                            options
521                        }) => {
522                            let mut csv = CsvFormat::default();
523                            if let Some(options) = options {
524                                csv = csv.with_options(options.try_into()?)
525                            }
526                            Arc::new(csv)
527                        },
528                        FileFormatType::Json(protobuf::NdJsonFormat {
529                            options
530                        }) => {
531                            let mut json = OtherNdJsonFormat::default();
532                            if let Some(options) = options {
533                                json = json.with_options(options.try_into()?)
534                            }
535                            Arc::new(json)
536                        }
537                        FileFormatType::Avro(..) => {
538                            #[cfg(feature = "avro")]
539                            {
540                                Arc::new(AvroFormat)
541                            }
542                            #[cfg(not(feature = "avro"))]
543                            {
544                                panic!(
545                                    "Unable to process avro file since `avro` feature is not enabled"
546                                );
547                            }
548                        }
549                        FileFormatType::Arrow(..) => {
550                            Arc::new(ArrowFormat)
551                        }
552                    };
553
554                let table_paths = &scan
555                    .paths
556                    .iter()
557                    .map(ListingTableUrl::parse)
558                    .collect::<Result<Vec<_>, _>>()?;
559
560                let partition_columns = scan
561                    .table_partition_cols
562                    .iter()
563                    .map(|col| {
564                        let Some(arrow_type) = col.arrow_type.as_ref() else {
565                            return Err(proto_error(
566                                "Missing Arrow type in partition columns",
567                            ));
568                        };
569                        let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
570                            proto_error(format!("Received an unknown ArrowType: {e}"))
571                        })?;
572                        Ok((col.name.clone(), arrow_type))
573                    })
574                    .collect::<Result<Vec<_>>>()?;
575
576                let options = ListingOptions::new(file_format)
577                    .with_file_extension(&scan.file_extension)
578                    .with_table_partition_cols(partition_columns)
579                    .with_collect_stat(scan.collect_stat)
580                    .with_target_partitions(scan.target_partitions as usize)
581                    .with_file_sort_order(all_sort_orders);
582
583                let config =
584                    ListingTableConfig::new_with_multi_paths(table_paths.clone())
585                        .with_listing_options(options)
586                        .with_schema(Arc::new(schema));
587
588                let provider = ListingTable::try_new(config)?.with_cache(
589                    ctx.runtime_env().cache_manager.get_file_statistic_cache(),
590                );
591
592                let table_name =
593                    from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
594
595                let mut projection = None;
596                if let Some(columns) = &scan.projection {
597                    let column_indices = columns
598                        .columns
599                        .iter()
600                        .map(|name| provider.schema().index_of(name))
601                        .collect::<Result<Vec<usize>, _>>()?;
602                    projection = Some(column_indices);
603                }
604
605                LogicalPlanBuilder::scan_with_filters(
606                    table_name,
607                    provider_as_source(Arc::new(provider)),
608                    projection,
609                    filters,
610                )?
611                .build()
612            }
613            LogicalPlanType::CustomScan(scan) => {
614                let schema: Schema = convert_required!(scan.schema)?;
615                let schema = Arc::new(schema);
616                let mut projection = None;
617                if let Some(columns) = &scan.projection {
618                    let column_indices = columns
619                        .columns
620                        .iter()
621                        .map(|name| schema.index_of(name))
622                        .collect::<Result<Vec<usize>, _>>()?;
623                    projection = Some(column_indices);
624                }
625
626                let filters =
627                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
628
629                let table_name =
630                    from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
631
632                let provider = extension_codec.try_decode_table_provider(
633                    &scan.custom_table_data,
634                    &table_name,
635                    schema,
636                    ctx,
637                )?;
638
639                LogicalPlanBuilder::scan_with_filters(
640                    table_name,
641                    provider_as_source(provider),
642                    projection,
643                    filters,
644                )?
645                .build()
646            }
647            LogicalPlanType::Sort(sort) => {
648                let input: LogicalPlan =
649                    into_logical_plan!(sort.input, ctx, extension_codec)?;
650                let sort_expr: Vec<SortExpr> =
651                    from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
652                let fetch: Option<usize> = sort.fetch.try_into().ok();
653                LogicalPlanBuilder::from(input)
654                    .sort_with_limit(sort_expr, fetch)?
655                    .build()
656            }
657            LogicalPlanType::Repartition(repartition) => {
658                use datafusion_expr::Partitioning;
659                let input: LogicalPlan =
660                    into_logical_plan!(repartition.input, ctx, extension_codec)?;
661                use protobuf::repartition_node::PartitionMethod;
662                let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
663                    internal_datafusion_err!(
664                        "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
665                    )
666                })?;
667
668                let partitioning_scheme = match pb_partition_method {
669                    PartitionMethod::Hash(protobuf::HashRepartition {
670                        hash_expr: pb_hash_expr,
671                        partition_count,
672                    }) => Partitioning::Hash(
673                        from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
674                        *partition_count as usize,
675                    ),
676                    PartitionMethod::RoundRobin(partition_count) => {
677                        Partitioning::RoundRobinBatch(*partition_count as usize)
678                    }
679                };
680
681                LogicalPlanBuilder::from(input)
682                    .repartition(partitioning_scheme)?
683                    .build()
684            }
685            LogicalPlanType::EmptyRelation(empty_relation) => {
686                LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
687            }
688            LogicalPlanType::CreateExternalTable(create_extern_table) => {
689                let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
690                    internal_datafusion_err!(
691                        "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
692                    )
693                })?;
694
695                let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
696                    internal_datafusion_err!(
697                        "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
698                    )
699                })?;
700                let definition = if !create_extern_table.definition.is_empty() {
701                    Some(create_extern_table.definition.clone())
702                } else {
703                    None
704                };
705
706                let mut order_exprs = vec![];
707                for expr in &create_extern_table.order_exprs {
708                    order_exprs.push(from_proto::parse_sorts(
709                        &expr.sort_expr_nodes,
710                        ctx,
711                        extension_codec,
712                    )?);
713                }
714
715                let mut column_defaults =
716                    HashMap::with_capacity(create_extern_table.column_defaults.len());
717                for (col_name, expr) in &create_extern_table.column_defaults {
718                    let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
719                    column_defaults.insert(col_name.clone(), expr);
720                }
721
722                Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
723                    CreateExternalTable::builder(
724                        from_table_reference(
725                            create_extern_table.name.as_ref(),
726                            "CreateExternalTable",
727                        )?,
728                        create_extern_table.location.clone(),
729                        create_extern_table.file_type.clone(),
730                        pb_schema.try_into()?,
731                    )
732                    .with_partition_cols(create_extern_table.table_partition_cols.clone())
733                    .with_order_exprs(order_exprs)
734                    .with_if_not_exists(create_extern_table.if_not_exists)
735                    .with_or_replace(create_extern_table.or_replace)
736                    .with_temporary(create_extern_table.temporary)
737                    .with_definition(definition)
738                    .with_unbounded(create_extern_table.unbounded)
739                    .with_options(create_extern_table.options.clone())
740                    .with_constraints(constraints.into())
741                    .with_column_defaults(column_defaults)
742                    .build(),
743                )))
744            }
745            LogicalPlanType::CreateView(create_view) => {
746                let plan = create_view
747                    .input.clone().ok_or_else(|| internal_datafusion_err!(
748                    "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
749                ))?
750                    .try_into_logical_plan(ctx, extension_codec)?;
751                let definition = if !create_view.definition.is_empty() {
752                    Some(create_view.definition.clone())
753                } else {
754                    None
755                };
756
757                Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
758                    name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
759                    temporary: create_view.temporary,
760                    input: Arc::new(plan),
761                    or_replace: create_view.or_replace,
762                    definition,
763                })))
764            }
765            LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
766                let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
767                    internal_datafusion_err!(
768                        "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
769                    )
770                })?;
771
772                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
773                    CreateCatalogSchema {
774                        schema_name: create_catalog_schema.schema_name.clone(),
775                        if_not_exists: create_catalog_schema.if_not_exists,
776                        schema: pb_schema.try_into()?,
777                    },
778                )))
779            }
780            LogicalPlanType::CreateCatalog(create_catalog) => {
781                let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
782                    internal_datafusion_err!(
783                        "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
784                    )
785                })?;
786
787                Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
788                    CreateCatalog {
789                        catalog_name: create_catalog.catalog_name.clone(),
790                        if_not_exists: create_catalog.if_not_exists,
791                        schema: pb_schema.try_into()?,
792                    },
793                )))
794            }
795            LogicalPlanType::Analyze(analyze) => {
796                let input: LogicalPlan =
797                    into_logical_plan!(analyze.input, ctx, extension_codec)?;
798                LogicalPlanBuilder::from(input)
799                    .explain(analyze.verbose, true)?
800                    .build()
801            }
802            LogicalPlanType::Explain(explain) => {
803                let input: LogicalPlan =
804                    into_logical_plan!(explain.input, ctx, extension_codec)?;
805                let pb_format = protobuf::ExplainFormat::try_from(explain.format)
806                    .map_err(|_| {
807                        proto_error(format!(
808                            "Received an ExplainNode message with unknown ExplainFormat {}",
809                            explain.format
810                        ))
811                    })?;
812                let explain_format = match pb_format {
813                    protobuf::ExplainFormat::Indent => ExplainFormat::Indent,
814                    protobuf::ExplainFormat::Tree => ExplainFormat::Tree,
815                    protobuf::ExplainFormat::Pgjson => ExplainFormat::PostgresJSON,
816                    protobuf::ExplainFormat::Graphviz => ExplainFormat::Graphviz,
817                };
818                let explain_option =
819                    datafusion_expr::logical_plan::ExplainOption::default()
820                        .with_verbose(explain.verbose)
821                        .with_format(explain_format);
822                LogicalPlanBuilder::from(input)
823                    .explain_option_format(explain_option)?
824                    .build()
825            }
826            LogicalPlanType::SubqueryAlias(aliased_relation) => {
827                let input: LogicalPlan =
828                    into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
829                let alias = from_table_reference(
830                    aliased_relation.alias.as_ref(),
831                    "SubqueryAlias",
832                )?;
833                LogicalPlanBuilder::from(input).alias(alias)?.build()
834            }
835            LogicalPlanType::Limit(limit) => {
836                let input: LogicalPlan =
837                    into_logical_plan!(limit.input, ctx, extension_codec)?;
838                let skip = limit.skip.max(0) as usize;
839
840                let fetch = if limit.fetch < 0 {
841                    None
842                } else {
843                    Some(limit.fetch as usize)
844                };
845
846                LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
847            }
848            LogicalPlanType::Join(join) => {
849                let left_keys: Vec<Expr> =
850                    from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
851                let right_keys: Vec<Expr> =
852                    from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
853                let join_type =
854                    protobuf::JoinType::try_from(join.join_type).map_err(|_| {
855                        proto_error(format!(
856                            "Received a JoinNode message with unknown JoinType {}",
857                            join.join_type
858                        ))
859                    })?;
860                let join_constraint = protobuf::JoinConstraint::try_from(
861                    join.join_constraint,
862                )
863                .map_err(|_| {
864                    proto_error(format!(
865                        "Received a JoinNode message with unknown JoinConstraint {}",
866                        join.join_constraint
867                    ))
868                })?;
869                let filter: Option<Expr> = join
870                    .filter
871                    .as_ref()
872                    .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
873                    .map_or(Ok(None), |v| v.map(Some))?;
874
875                let builder = LogicalPlanBuilder::from(into_logical_plan!(
876                    join.left,
877                    ctx,
878                    extension_codec
879                )?);
880                let builder = match join_constraint.into() {
881                    JoinConstraint::On => builder.join_with_expr_keys(
882                        into_logical_plan!(join.right, ctx, extension_codec)?,
883                        join_type.into(),
884                        (left_keys, right_keys),
885                        filter,
886                    )?,
887                    JoinConstraint::Using => {
888                        // The equijoin keys in using-join must be column.
889                        let using_keys = left_keys
890                            .into_iter()
891                            .map(|key| {
892                                key.try_as_col().cloned()
893                                    .ok_or_else(|| internal_datafusion_err!(
894                                        "Using join keys must be column references, got: {key:?}"
895                                    ))
896                            })
897                            .collect::<Result<Vec<_>, _>>()?;
898                        builder.join_using(
899                            into_logical_plan!(join.right, ctx, extension_codec)?,
900                            join_type.into(),
901                            using_keys,
902                        )?
903                    }
904                };
905
906                builder.build()
907            }
908            LogicalPlanType::Union(union) => {
909                assert_or_internal_err!(
910                    union.inputs.len() >= 2,
911                    "Protobuf deserialization error, Union requires at least two inputs."
912                );
913                let (first, rest) = union.inputs.split_first().unwrap();
914                let mut builder = LogicalPlanBuilder::from(
915                    first.try_into_logical_plan(ctx, extension_codec)?,
916                );
917
918                for i in rest {
919                    let plan = i.try_into_logical_plan(ctx, extension_codec)?;
920                    builder = builder.union(plan)?;
921                }
922                builder.build()
923            }
924            LogicalPlanType::CrossJoin(crossjoin) => {
925                let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
926                let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
927
928                LogicalPlanBuilder::from(left).cross_join(right)?.build()
929            }
930            LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
931                let input_plans: Vec<LogicalPlan> = inputs
932                    .iter()
933                    .map(|i| i.try_into_logical_plan(ctx, extension_codec))
934                    .collect::<Result<_>>()?;
935
936                let extension_node =
937                    extension_codec.try_decode(node, &input_plans, ctx)?;
938                Ok(LogicalPlan::Extension(extension_node))
939            }
940            LogicalPlanType::Distinct(distinct) => {
941                let input: LogicalPlan =
942                    into_logical_plan!(distinct.input, ctx, extension_codec)?;
943                LogicalPlanBuilder::from(input).distinct()?.build()
944            }
945            LogicalPlanType::DistinctOn(distinct_on) => {
946                let input: LogicalPlan =
947                    into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
948                let on_expr =
949                    from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
950                let select_expr = from_proto::parse_exprs(
951                    &distinct_on.select_expr,
952                    ctx,
953                    extension_codec,
954                )?;
955                let sort_expr = match distinct_on.sort_expr.len() {
956                    0 => None,
957                    _ => Some(from_proto::parse_sorts(
958                        &distinct_on.sort_expr,
959                        ctx,
960                        extension_codec,
961                    )?),
962                };
963                LogicalPlanBuilder::from(input)
964                    .distinct_on(on_expr, select_expr, sort_expr)?
965                    .build()
966            }
967            LogicalPlanType::ViewScan(scan) => {
968                let schema: Schema = convert_required!(scan.schema)?;
969
970                let mut projection = None;
971                if let Some(columns) = &scan.projection {
972                    let column_indices = columns
973                        .columns
974                        .iter()
975                        .map(|name| schema.index_of(name))
976                        .collect::<Result<Vec<usize>, _>>()?;
977                    projection = Some(column_indices);
978                }
979
980                let input: LogicalPlan =
981                    into_logical_plan!(scan.input, ctx, extension_codec)?;
982
983                let definition = if !scan.definition.is_empty() {
984                    Some(scan.definition.clone())
985                } else {
986                    None
987                };
988
989                let provider = ViewTable::new(input, definition);
990
991                let table_name =
992                    from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
993
994                LogicalPlanBuilder::scan(
995                    table_name,
996                    provider_as_source(Arc::new(provider)),
997                    projection,
998                )?
999                .build()
1000            }
1001            LogicalPlanType::Prepare(prepare) => {
1002                let input: LogicalPlan =
1003                    into_logical_plan!(prepare.input, ctx, extension_codec)?;
1004                let data_types: Vec<DataType> = prepare
1005                    .data_types
1006                    .iter()
1007                    .map(DataType::try_from)
1008                    .collect::<Result<_, _>>()?;
1009                let fields: Vec<Field> = prepare
1010                    .fields
1011                    .iter()
1012                    .map(Field::try_from)
1013                    .collect::<Result<_, _>>()?;
1014
1015                // If the fields are empty this may have been generated by an
1016                // earlier version of DataFusion, in which case the DataTypes
1017                // can be used to construct the plan.
1018                if fields.is_empty() {
1019                    LogicalPlanBuilder::from(input)
1020                        .prepare(
1021                            prepare.name.clone(),
1022                            data_types
1023                                .into_iter()
1024                                .map(|dt| Field::new("", dt, true).into())
1025                                .collect(),
1026                        )?
1027                        .build()
1028                } else {
1029                    LogicalPlanBuilder::from(input)
1030                        .prepare(
1031                            prepare.name.clone(),
1032                            fields.into_iter().map(|f| f.into()).collect(),
1033                        )?
1034                        .build()
1035                }
1036            }
1037            LogicalPlanType::DropView(dropview) => {
1038                Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1039                    name: from_table_reference(dropview.name.as_ref(), "DropView")?,
1040                    if_exists: dropview.if_exists,
1041                    schema: Arc::new(convert_required!(dropview.schema)?),
1042                })))
1043            }
1044            LogicalPlanType::CopyTo(copy) => {
1045                let input: LogicalPlan =
1046                    into_logical_plan!(copy.input, ctx, extension_codec)?;
1047
1048                let file_type: Arc<dyn FileType> = format_as_file_type(
1049                    extension_codec.try_decode_file_format(&copy.file_type, ctx)?,
1050                );
1051
1052                Ok(LogicalPlan::Copy(dml::CopyTo::new(
1053                    Arc::new(input),
1054                    copy.output_url.clone(),
1055                    copy.partition_by.clone(),
1056                    file_type,
1057                    Default::default(),
1058                )))
1059            }
1060            LogicalPlanType::Unnest(unnest) => {
1061                let input: LogicalPlan =
1062                    into_logical_plan!(unnest.input, ctx, extension_codec)?;
1063
1064                LogicalPlanBuilder::from(input)
1065                    .unnest_columns_with_options(
1066                        unnest.exec_columns.iter().map(|c| c.into()).collect(),
1067                        into_required!(unnest.options)?,
1068                    )?
1069                    .build()
1070            }
1071            LogicalPlanType::RecursiveQuery(recursive_query_node) => {
1072                let static_term = recursive_query_node
1073                    .static_term
1074                    .as_ref()
1075                    .ok_or_else(|| internal_datafusion_err!(
1076                        "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
1077                    ))?
1078                    .try_into_logical_plan(ctx, extension_codec)?;
1079
1080                let recursive_term = recursive_query_node
1081                    .recursive_term
1082                    .as_ref()
1083                    .ok_or_else(|| internal_datafusion_err!(
1084                        "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
1085                    ))?
1086                    .try_into_logical_plan(ctx, extension_codec)?;
1087
1088                Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1089                    name: recursive_query_node.name.clone(),
1090                    static_term: Arc::new(static_term),
1091                    recursive_term: Arc::new(recursive_term),
1092                    is_distinct: recursive_query_node.is_distinct,
1093                }))
1094            }
1095            LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
1096                let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
1097                let schema = convert_required!(*schema)?;
1098                let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
1099                LogicalPlanBuilder::scan(
1100                    name.as_str(),
1101                    provider_as_source(Arc::new(cte_work_table)),
1102                    None,
1103                )?
1104                .build()
1105            }
1106            LogicalPlanType::EmptyTableScan(scan) => {
1107                let schema: Schema = convert_required!(scan.schema)?;
1108                let schema = Arc::new(schema);
1109                let mut projection = None;
1110                if let Some(columns) = &scan.projection {
1111                    let column_indices = columns
1112                        .columns
1113                        .iter()
1114                        .map(|name| schema.index_of(name))
1115                        .collect::<Result<Vec<usize>, _>>()?;
1116                    projection = Some(column_indices);
1117                }
1118
1119                let filters =
1120                    from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
1121
1122                let table_name =
1123                    from_table_reference(scan.table_name.as_ref(), "EmptyTableScan")?;
1124
1125                let provider = Arc::new(EmptyTable::new(Arc::clone(&schema)));
1126
1127                LogicalPlanBuilder::scan_with_filters(
1128                    table_name,
1129                    provider_as_source(provider),
1130                    projection,
1131                    filters,
1132                )?
1133                .build()
1134            }
1135            LogicalPlanType::Dml(dml_node) => {
1136                Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
1137                    from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
1138                    to_table_source(&dml_node.target, ctx, extension_codec)?,
1139                    dml_node.dml_type().into(),
1140                    Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
1141                )))
1142            }
1143        }
1144    }
1145
1146    fn try_from_logical_plan(
1147        plan: &LogicalPlan,
1148        extension_codec: &dyn LogicalExtensionCodec,
1149    ) -> Result<Self>
1150    where
1151        Self: Sized,
1152    {
1153        match plan {
1154            LogicalPlan::Values(Values { values, .. }) => {
1155                let n_cols = if values.is_empty() {
1156                    0
1157                } else {
1158                    values[0].len()
1159                } as u64;
1160                let values_list =
1161                    serialize_exprs(values.iter().flatten(), extension_codec)?;
1162                Ok(LogicalPlanNode {
1163                    logical_plan_type: Some(LogicalPlanType::Values(
1164                        protobuf::ValuesNode {
1165                            n_cols,
1166                            values_list,
1167                        },
1168                    )),
1169                })
1170            }
1171            LogicalPlan::TableScan(TableScan {
1172                table_name,
1173                source,
1174                filters,
1175                projection,
1176                ..
1177            }) => {
1178                let provider = source_as_provider(source)?;
1179                let schema = provider.schema();
1180
1181                let projection = match projection {
1182                    None => None,
1183                    Some(columns) => {
1184                        let column_names = columns
1185                            .iter()
1186                            .map(|i| schema.field(*i).name().to_owned())
1187                            .collect();
1188                        Some(protobuf::ProjectionColumns {
1189                            columns: column_names,
1190                        })
1191                    }
1192                };
1193
1194                let filters: Vec<protobuf::LogicalExprNode> =
1195                    serialize_exprs(filters, extension_codec)?;
1196
1197                if let Some(listing_table) = provider.downcast_ref::<ListingTable>() {
1198                    let format = listing_table.options().format.as_ref();
1199                    let file_format_type = {
1200                        let mut maybe_some_type = None;
1201
1202                        #[cfg(feature = "parquet")]
1203                        if let Some(parquet) = format.downcast_ref::<ParquetFormat>() {
1204                            let options = parquet.options();
1205                            maybe_some_type =
1206                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1207                                    options: Some(options.try_into()?),
1208                                }));
1209                        };
1210
1211                        if let Some(csv) = format.downcast_ref::<CsvFormat>() {
1212                            let options = csv.options();
1213                            maybe_some_type =
1214                                Some(FileFormatType::Csv(protobuf::CsvFormat {
1215                                    options: Some(options.try_into()?),
1216                                }));
1217                        }
1218
1219                        if let Some(json) = format.downcast_ref::<OtherNdJsonFormat>() {
1220                            let options = json.options();
1221                            maybe_some_type =
1222                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
1223                                    options: Some(options.try_into()?),
1224                                }))
1225                        }
1226
1227                        #[cfg(feature = "avro")]
1228                        if format.is::<AvroFormat>() {
1229                            maybe_some_type =
1230                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1231                        }
1232
1233                        if format.is::<ArrowFormat>() {
1234                            maybe_some_type =
1235                                Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1236                        }
1237
1238                        if let Some(file_format_type) = maybe_some_type {
1239                            file_format_type
1240                        } else {
1241                            return Err(proto_error(format!(
1242                                "Error deserializing unknown file format: {:?}",
1243                                listing_table.options().format
1244                            )));
1245                        }
1246                    };
1247
1248                    let options = listing_table.options();
1249
1250                    let mut builder = SchemaBuilder::from(schema.as_ref());
1251                    for (idx, field) in schema.fields().iter().enumerate().rev() {
1252                        if options
1253                            .table_partition_cols
1254                            .iter()
1255                            .any(|(name, _)| name == field.name())
1256                        {
1257                            builder.remove(idx);
1258                        }
1259                    }
1260
1261                    let schema = builder.finish();
1262
1263                    let schema: protobuf::Schema = (&schema).try_into()?;
1264
1265                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1266                    for order in &options.file_sort_order {
1267                        let expr_vec = SortExprNodeCollection {
1268                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1269                        };
1270                        exprs_vec.push(expr_vec);
1271                    }
1272
1273                    let partition_columns = options
1274                        .table_partition_cols
1275                        .iter()
1276                        .map(|(name, arrow_type)| {
1277                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1278                                .map_err(|e| {
1279                                    proto_error(format!(
1280                                        "Received an unknown ArrowType: {e}"
1281                                    ))
1282                                })?;
1283                            Ok(protobuf::PartitionColumn {
1284                                name: name.clone(),
1285                                arrow_type: Some(arrow_type),
1286                            })
1287                        })
1288                        .collect::<Result<Vec<_>>>()?;
1289
1290                    Ok(LogicalPlanNode {
1291                        logical_plan_type: Some(LogicalPlanType::ListingScan(
1292                            protobuf::ListingTableScanNode {
1293                                file_format_type: Some(file_format_type),
1294                                table_name: Some(table_name.clone().into()),
1295                                collect_stat: options.collect_stat,
1296                                file_extension: options.file_extension.clone(),
1297                                table_partition_cols: partition_columns,
1298                                paths: listing_table
1299                                    .table_paths()
1300                                    .iter()
1301                                    .map(|x| x.to_string())
1302                                    .collect(),
1303                                schema: Some(schema),
1304                                projection,
1305                                filters,
1306                                target_partitions: options.target_partitions as u32,
1307                                file_sort_order: exprs_vec,
1308                            },
1309                        )),
1310                    })
1311                } else if let Some(view_table) = provider.downcast_ref::<ViewTable>() {
1312                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1313                    Ok(LogicalPlanNode {
1314                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1315                            protobuf::ViewTableScanNode {
1316                                table_name: Some(table_name.clone().into()),
1317                                input: Some(Box::new(
1318                                    LogicalPlanNode::try_from_logical_plan(
1319                                        view_table.logical_plan(),
1320                                        extension_codec,
1321                                    )?,
1322                                )),
1323                                schema: Some(schema),
1324                                projection,
1325                                definition: view_table
1326                                    .definition()
1327                                    .map(|s| s.to_string())
1328                                    .unwrap_or_default(),
1329                            },
1330                        ))),
1331                    })
1332                } else if let Some(cte_work_table) =
1333                    provider.downcast_ref::<CteWorkTable>()
1334                {
1335                    let name = cte_work_table.name().to_string();
1336                    let schema = cte_work_table.schema();
1337                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1338
1339                    Ok(LogicalPlanNode {
1340                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1341                            protobuf::CteWorkTableScanNode {
1342                                name,
1343                                schema: Some(schema),
1344                            },
1345                        )),
1346                    })
1347                } else if provider.downcast_ref::<EmptyTable>().is_some() {
1348                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1349
1350                    Ok(LogicalPlanNode {
1351                        logical_plan_type: Some(LogicalPlanType::EmptyTableScan(
1352                            protobuf::EmptyTableScanNode {
1353                                table_name: Some(table_name.clone().into()),
1354                                schema: Some(schema),
1355                                projection,
1356                                filters,
1357                            },
1358                        )),
1359                    })
1360                } else {
1361                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
1362                    let mut bytes = vec![];
1363                    extension_codec
1364                        .try_encode_table_provider(table_name, provider, &mut bytes)
1365                        .map_err(|e| context!("Error serializing custom table", e))?;
1366                    let scan = CustomScan(CustomTableScanNode {
1367                        table_name: Some(table_name.clone().into()),
1368                        projection,
1369                        schema: Some(schema),
1370                        filters,
1371                        custom_table_data: bytes,
1372                    });
1373                    let node = LogicalPlanNode {
1374                        logical_plan_type: Some(scan),
1375                    };
1376                    Ok(node)
1377                }
1378            }
1379            LogicalPlan::Projection(Projection { expr, input, .. }) => {
1380                Ok(LogicalPlanNode {
1381                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1382                        protobuf::ProjectionNode {
1383                            input: Some(Box::new(
1384                                LogicalPlanNode::try_from_logical_plan(
1385                                    input.as_ref(),
1386                                    extension_codec,
1387                                )?,
1388                            )),
1389                            expr: serialize_exprs(expr, extension_codec)?,
1390                            optional_alias: None,
1391                        },
1392                    ))),
1393                })
1394            }
1395            LogicalPlan::Filter(filter) => {
1396                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1397                    filter.input.as_ref(),
1398                    extension_codec,
1399                )?;
1400                Ok(LogicalPlanNode {
1401                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1402                        protobuf::SelectionNode {
1403                            input: Some(Box::new(input)),
1404                            expr: Some(Box::new(serialize_expr(
1405                                &filter.predicate,
1406                                extension_codec,
1407                            )?)),
1408                        },
1409                    ))),
1410                })
1411            }
1412            LogicalPlan::Distinct(Distinct::All(input)) => {
1413                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414                    input.as_ref(),
1415                    extension_codec,
1416                )?;
1417                Ok(LogicalPlanNode {
1418                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1419                        protobuf::DistinctNode {
1420                            input: Some(Box::new(input)),
1421                        },
1422                    ))),
1423                })
1424            }
1425            LogicalPlan::Distinct(Distinct::On(DistinctOn {
1426                on_expr,
1427                select_expr,
1428                sort_expr,
1429                input,
1430                ..
1431            })) => {
1432                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1433                    input.as_ref(),
1434                    extension_codec,
1435                )?;
1436                let sort_expr = match sort_expr {
1437                    None => vec![],
1438                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1439                };
1440                Ok(LogicalPlanNode {
1441                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1442                        protobuf::DistinctOnNode {
1443                            on_expr: serialize_exprs(on_expr, extension_codec)?,
1444                            select_expr: serialize_exprs(select_expr, extension_codec)?,
1445                            sort_expr,
1446                            input: Some(Box::new(input)),
1447                        },
1448                    ))),
1449                })
1450            }
1451            LogicalPlan::Window(Window {
1452                input, window_expr, ..
1453            }) => {
1454                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1455                    input.as_ref(),
1456                    extension_codec,
1457                )?;
1458                Ok(LogicalPlanNode {
1459                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1460                        protobuf::WindowNode {
1461                            input: Some(Box::new(input)),
1462                            window_expr: serialize_exprs(window_expr, extension_codec)?,
1463                        },
1464                    ))),
1465                })
1466            }
1467            LogicalPlan::Aggregate(Aggregate {
1468                group_expr,
1469                aggr_expr,
1470                input,
1471                ..
1472            }) => {
1473                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1474                    input.as_ref(),
1475                    extension_codec,
1476                )?;
1477                Ok(LogicalPlanNode {
1478                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1479                        protobuf::AggregateNode {
1480                            input: Some(Box::new(input)),
1481                            group_expr: serialize_exprs(group_expr, extension_codec)?,
1482                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1483                        },
1484                    ))),
1485                })
1486            }
1487            LogicalPlan::Join(Join {
1488                left,
1489                right,
1490                on,
1491                filter,
1492                join_type,
1493                join_constraint,
1494                null_equality,
1495                ..
1496            }) => {
1497                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1498                    left.as_ref(),
1499                    extension_codec,
1500                )?;
1501                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1502                    right.as_ref(),
1503                    extension_codec,
1504                )?;
1505                let (left_join_key, right_join_key) = on
1506                    .iter()
1507                    .map(|(l, r)| {
1508                        Ok((
1509                            serialize_expr(l, extension_codec)?,
1510                            serialize_expr(r, extension_codec)?,
1511                        ))
1512                    })
1513                    .collect::<Result<Vec<_>, ToProtoError>>()?
1514                    .into_iter()
1515                    .unzip();
1516                let join_type: protobuf::JoinType = join_type.to_owned().into();
1517                let join_constraint: protobuf::JoinConstraint =
1518                    join_constraint.to_owned().into();
1519                let null_equality: protobuf::NullEquality =
1520                    null_equality.to_owned().into();
1521                let filter = filter
1522                    .as_ref()
1523                    .map(|e| serialize_expr(e, extension_codec).map(Box::new))
1524                    .map_or(Ok(None), |v| v.map(Some))?;
1525                Ok(LogicalPlanNode {
1526                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1527                        protobuf::JoinNode {
1528                            left: Some(Box::new(left)),
1529                            right: Some(Box::new(right)),
1530                            join_type: join_type.into(),
1531                            join_constraint: join_constraint.into(),
1532                            left_join_key,
1533                            right_join_key,
1534                            null_equality: null_equality.into(),
1535                            filter,
1536                        },
1537                    ))),
1538                })
1539            }
1540            LogicalPlan::Subquery(subquery) => {
1541                // Serialize the inner subquery plan directly — the
1542                // LogicalPlan::Subquery wrapper is reconstructed during
1543                // expression deserialization.
1544                LogicalPlanNode::try_from_logical_plan(
1545                    &subquery.subquery,
1546                    extension_codec,
1547                )
1548            }
1549            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1550                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1551                    input.as_ref(),
1552                    extension_codec,
1553                )?;
1554                Ok(LogicalPlanNode {
1555                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1556                        protobuf::SubqueryAliasNode {
1557                            input: Some(Box::new(input)),
1558                            alias: Some((*alias).clone().into()),
1559                        },
1560                    ))),
1561                })
1562            }
1563            LogicalPlan::Limit(limit) => {
1564                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1565                    limit.input.as_ref(),
1566                    extension_codec,
1567                )?;
1568                let SkipType::Literal(skip) = limit.get_skip_type()? else {
1569                    return Err(proto_error(
1570                        "LogicalPlan::Limit only supports literal skip values",
1571                    ));
1572                };
1573                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1574                    return Err(proto_error(
1575                        "LogicalPlan::Limit only supports literal fetch values",
1576                    ));
1577                };
1578
1579                Ok(LogicalPlanNode {
1580                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1581                        protobuf::LimitNode {
1582                            input: Some(Box::new(input)),
1583                            skip: skip as i64,
1584                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1585                        },
1586                    ))),
1587                })
1588            }
1589            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1590                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1591                    input.as_ref(),
1592                    extension_codec,
1593                )?;
1594                let sort_expr: Vec<protobuf::SortExprNode> =
1595                    serialize_sorts(expr, extension_codec)?;
1596                Ok(LogicalPlanNode {
1597                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1598                        protobuf::SortNode {
1599                            input: Some(Box::new(input)),
1600                            expr: sort_expr,
1601                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1602                        },
1603                    ))),
1604                })
1605            }
1606            LogicalPlan::Repartition(Repartition {
1607                input,
1608                partitioning_scheme,
1609            }) => {
1610                use datafusion_expr::Partitioning;
1611                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1612                    input.as_ref(),
1613                    extension_codec,
1614                )?;
1615
1616                // Assumed common usize field was batch size
1617                // Used u64 to avoid any nastiness involving large values, most data clusters are probably uniformly 64 bits any ways
1618                use protobuf::repartition_node::PartitionMethod;
1619
1620                let pb_partition_method = match partitioning_scheme {
1621                    Partitioning::Hash(exprs, partition_count) => {
1622                        PartitionMethod::Hash(protobuf::HashRepartition {
1623                            hash_expr: serialize_exprs(exprs, extension_codec)?,
1624                            partition_count: *partition_count as u64,
1625                        })
1626                    }
1627                    Partitioning::RoundRobinBatch(partition_count) => {
1628                        PartitionMethod::RoundRobin(*partition_count as u64)
1629                    }
1630                    Partitioning::DistributeBy(_) => {
1631                        return not_impl_err!("DistributeBy");
1632                    }
1633                };
1634
1635                Ok(LogicalPlanNode {
1636                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1637                        protobuf::RepartitionNode {
1638                            input: Some(Box::new(input)),
1639                            partition_method: Some(pb_partition_method),
1640                        },
1641                    ))),
1642                })
1643            }
1644            LogicalPlan::EmptyRelation(EmptyRelation {
1645                produce_one_row, ..
1646            }) => Ok(LogicalPlanNode {
1647                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1648                    protobuf::EmptyRelationNode {
1649                        produce_one_row: *produce_one_row,
1650                    },
1651                )),
1652            }),
1653            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1654                CreateExternalTable {
1655                    name,
1656                    location,
1657                    file_type,
1658                    schema: df_schema,
1659                    table_partition_cols,
1660                    if_not_exists,
1661                    or_replace,
1662                    definition,
1663                    order_exprs,
1664                    unbounded,
1665                    options,
1666                    constraints,
1667                    column_defaults,
1668                    temporary,
1669                },
1670            )) => {
1671                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1672                for order in order_exprs {
1673                    let temp = SortExprNodeCollection {
1674                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1675                    };
1676                    converted_order_exprs.push(temp);
1677                }
1678
1679                let mut converted_column_defaults =
1680                    HashMap::with_capacity(column_defaults.len());
1681                for (col_name, expr) in column_defaults {
1682                    converted_column_defaults
1683                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1684                }
1685
1686                Ok(LogicalPlanNode {
1687                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1688                        protobuf::CreateExternalTableNode {
1689                            name: Some(name.clone().into()),
1690                            location: location.clone(),
1691                            file_type: file_type.clone(),
1692                            schema: Some(df_schema.try_into()?),
1693                            table_partition_cols: table_partition_cols.clone(),
1694                            if_not_exists: *if_not_exists,
1695                            or_replace: *or_replace,
1696                            temporary: *temporary,
1697                            order_exprs: converted_order_exprs,
1698                            definition: definition.clone().unwrap_or_default(),
1699                            unbounded: *unbounded,
1700                            options: options.clone(),
1701                            constraints: Some(constraints.clone().into()),
1702                            column_defaults: converted_column_defaults,
1703                        },
1704                    )),
1705                })
1706            }
1707            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1708                name,
1709                input,
1710                or_replace,
1711                definition,
1712                temporary,
1713            })) => Ok(LogicalPlanNode {
1714                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1715                    protobuf::CreateViewNode {
1716                        name: Some(name.clone().into()),
1717                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1718                            input,
1719                            extension_codec,
1720                        )?)),
1721                        or_replace: *or_replace,
1722                        temporary: *temporary,
1723                        definition: definition.clone().unwrap_or_default(),
1724                    },
1725                ))),
1726            }),
1727            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1728                CreateCatalogSchema {
1729                    schema_name,
1730                    if_not_exists,
1731                    schema: df_schema,
1732                },
1733            )) => Ok(LogicalPlanNode {
1734                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1735                    protobuf::CreateCatalogSchemaNode {
1736                        schema_name: schema_name.clone(),
1737                        if_not_exists: *if_not_exists,
1738                        schema: Some(df_schema.try_into()?),
1739                    },
1740                )),
1741            }),
1742            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1743                catalog_name,
1744                if_not_exists,
1745                schema: df_schema,
1746            })) => Ok(LogicalPlanNode {
1747                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1748                    protobuf::CreateCatalogNode {
1749                        catalog_name: catalog_name.clone(),
1750                        if_not_exists: *if_not_exists,
1751                        schema: Some(df_schema.try_into()?),
1752                    },
1753                )),
1754            }),
1755            LogicalPlan::Analyze(a) => {
1756                let input = LogicalPlanNode::try_from_logical_plan(
1757                    a.input.as_ref(),
1758                    extension_codec,
1759                )?;
1760                Ok(LogicalPlanNode {
1761                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1762                        protobuf::AnalyzeNode {
1763                            input: Some(Box::new(input)),
1764                            verbose: a.verbose,
1765                        },
1766                    ))),
1767                })
1768            }
1769            LogicalPlan::Explain(a) => {
1770                let input = LogicalPlanNode::try_from_logical_plan(
1771                    a.plan.as_ref(),
1772                    extension_codec,
1773                )?;
1774                Ok(LogicalPlanNode {
1775                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1776                        protobuf::ExplainNode {
1777                            input: Some(Box::new(input)),
1778                            verbose: a.verbose,
1779                            format: match &a.explain_format {
1780                                ExplainFormat::Indent => protobuf::ExplainFormat::Indent,
1781                                ExplainFormat::Tree => protobuf::ExplainFormat::Tree,
1782                                ExplainFormat::PostgresJSON => {
1783                                    protobuf::ExplainFormat::Pgjson
1784                                }
1785                                ExplainFormat::Graphviz => {
1786                                    protobuf::ExplainFormat::Graphviz
1787                                }
1788                            }
1789                            .into(),
1790                        },
1791                    ))),
1792                })
1793            }
1794            LogicalPlan::Union(union) => {
1795                let inputs: Vec<LogicalPlanNode> = union
1796                    .inputs
1797                    .iter()
1798                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1799                    .collect::<Result<_>>()?;
1800                Ok(LogicalPlanNode {
1801                    logical_plan_type: Some(LogicalPlanType::Union(
1802                        protobuf::UnionNode { inputs },
1803                    )),
1804                })
1805            }
1806            LogicalPlan::Extension(extension) => {
1807                let mut buf: Vec<u8> = vec![];
1808                extension_codec.try_encode(extension, &mut buf)?;
1809
1810                let inputs: Vec<LogicalPlanNode> = extension
1811                    .node
1812                    .inputs()
1813                    .iter()
1814                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1815                    .collect::<Result<_>>()?;
1816
1817                Ok(LogicalPlanNode {
1818                    logical_plan_type: Some(LogicalPlanType::Extension(
1819                        LogicalExtensionNode { node: buf, inputs },
1820                    )),
1821                })
1822            }
1823            LogicalPlan::Statement(Statement::Prepare(Prepare {
1824                name,
1825                fields,
1826                input,
1827            })) => {
1828                let input =
1829                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1830                Ok(LogicalPlanNode {
1831                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1832                        protobuf::PrepareNode {
1833                            name: name.clone(),
1834                            input: Some(Box::new(input)),
1835                            // Store the DataTypes for reading by older DataFusion
1836                            data_types: fields
1837                                .iter()
1838                                .map(|f| f.data_type().try_into())
1839                                .collect::<Result<Vec<_>, _>>()?,
1840                            // Store the Fields for current and future DataFusion
1841                            fields: fields
1842                                .iter()
1843                                .map(|f| f.as_ref().try_into())
1844                                .collect::<Result<Vec<_>, _>>()?,
1845                        },
1846                    ))),
1847                })
1848            }
1849            LogicalPlan::Unnest(Unnest {
1850                input,
1851                exec_columns,
1852                list_type_columns,
1853                struct_type_columns,
1854                dependency_indices,
1855                schema,
1856                options,
1857            }) => {
1858                let input =
1859                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1860                let proto_unnest_list_items = list_type_columns
1861                    .iter()
1862                    .map(|(index, ul)| ColumnUnnestListItem {
1863                        input_index: *index as _,
1864                        recursion: Some(ColumnUnnestListRecursion {
1865                            output_column: Some(ul.output_column.to_owned().into()),
1866                            depth: ul.depth as _,
1867                        }),
1868                    })
1869                    .collect();
1870                Ok(LogicalPlanNode {
1871                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1872                        protobuf::UnnestNode {
1873                            input: Some(Box::new(input)),
1874                            exec_columns: exec_columns
1875                                .iter()
1876                                .map(|col| col.into())
1877                                .collect(),
1878                            list_type_columns: proto_unnest_list_items,
1879                            struct_type_columns: struct_type_columns
1880                                .iter()
1881                                .map(|c| *c as u64)
1882                                .collect(),
1883                            dependency_indices: dependency_indices
1884                                .iter()
1885                                .map(|c| *c as u64)
1886                                .collect(),
1887                            schema: Some(schema.try_into()?),
1888                            options: Some(options.into()),
1889                        },
1890                    ))),
1891                })
1892            }
1893            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1894                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1895            )),
1896            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1897                "LogicalPlan serde is not yet implemented for CreateIndex",
1898            )),
1899            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1900                "LogicalPlan serde is not yet implemented for DropTable",
1901            )),
1902            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1903                name,
1904                if_exists,
1905                schema,
1906            })) => Ok(LogicalPlanNode {
1907                logical_plan_type: Some(LogicalPlanType::DropView(
1908                    protobuf::DropViewNode {
1909                        name: Some(name.clone().into()),
1910                        if_exists: *if_exists,
1911                        schema: Some(schema.try_into()?),
1912                    },
1913                )),
1914            }),
1915            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1916                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1917            )),
1918            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1919                "LogicalPlan serde is not yet implemented for CreateFunction",
1920            )),
1921            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1922                "LogicalPlan serde is not yet implemented for DropFunction",
1923            )),
1924            LogicalPlan::Statement(_) => Err(proto_error(
1925                "LogicalPlan serde is not yet implemented for Statement",
1926            )),
1927            LogicalPlan::Dml(DmlStatement {
1928                table_name,
1929                target,
1930                op,
1931                input,
1932                ..
1933            }) => {
1934                let input =
1935                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1936                let dml_type: dml_node::Type = op.into();
1937                Ok(LogicalPlanNode {
1938                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1939                        input: Some(Box::new(input)),
1940                        target: Some(Box::new(from_table_source(
1941                            table_name.clone(),
1942                            Arc::clone(target),
1943                            extension_codec,
1944                        )?)),
1945                        table_name: Some(table_name.clone().into()),
1946                        dml_type: dml_type.into(),
1947                    }))),
1948                })
1949            }
1950            LogicalPlan::Copy(dml::CopyTo {
1951                input,
1952                output_url,
1953                file_type,
1954                partition_by,
1955                ..
1956            }) => {
1957                let input =
1958                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1959                let mut buf = Vec::new();
1960                extension_codec
1961                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1962
1963                Ok(LogicalPlanNode {
1964                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1965                        protobuf::CopyToNode {
1966                            input: Some(Box::new(input)),
1967                            output_url: output_url.to_string(),
1968                            file_type: buf,
1969                            partition_by: partition_by.clone(),
1970                        },
1971                    ))),
1972                })
1973            }
1974            LogicalPlan::DescribeTable(_) => Err(proto_error(
1975                "LogicalPlan serde is not yet implemented for DescribeTable",
1976            )),
1977            LogicalPlan::RecursiveQuery(recursive) => {
1978                let static_term = LogicalPlanNode::try_from_logical_plan(
1979                    recursive.static_term.as_ref(),
1980                    extension_codec,
1981                )?;
1982                let recursive_term = LogicalPlanNode::try_from_logical_plan(
1983                    recursive.recursive_term.as_ref(),
1984                    extension_codec,
1985                )?;
1986
1987                Ok(LogicalPlanNode {
1988                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1989                        protobuf::RecursiveQueryNode {
1990                            name: recursive.name.clone(),
1991                            static_term: Some(Box::new(static_term)),
1992                            recursive_term: Some(Box::new(recursive_term)),
1993                            is_distinct: recursive.is_distinct,
1994                        },
1995                    ))),
1996                })
1997            }
1998        }
1999    }
2000}