1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "parquet")]
39use datafusion::datasource::file_format::parquet::ParquetFormat;
40use datafusion::datasource::file_format::{
41 file_type_to_format, format_as_file_type, FileFormatFactory,
42};
43use datafusion::{
44 datasource::{
45 file_format::{
46 avro::AvroFormat, csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat,
47 FileFormat,
48 },
49 listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
50 view::ViewTable,
51 TableProvider,
52 },
53 datasource::{provider_as_source, source_as_provider},
54 prelude::SessionContext,
55};
56use datafusion_common::file_options::file_type::FileType;
57use datafusion_common::{
58 context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
59 DataFusionError, Result, TableReference, ToDFSchema,
60};
61use datafusion_expr::{
62 dml,
63 logical_plan::{
64 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
65 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
66 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
67 SubqueryAlias, TableScan, Values, Window,
68 },
69 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
70 Statement, WindowUDF,
71};
72use datafusion_expr::{
73 AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
74 TableSource, Unnest,
75};
76
77use self::to_proto::{serialize_expr, serialize_exprs};
78use crate::logical_plan::to_proto::serialize_sorts;
79use prost::bytes::BufMut;
80use prost::Message;
81
82pub mod file_formats;
83pub mod from_proto;
84pub mod to_proto;
85
86pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
87 fn try_decode(buf: &[u8]) -> Result<Self>
88 where
89 Self: Sized;
90
91 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
92 where
93 B: BufMut,
94 Self: Sized;
95
96 fn try_into_logical_plan(
97 &self,
98 ctx: &SessionContext,
99 extension_codec: &dyn LogicalExtensionCodec,
100 ) -> Result<LogicalPlan>;
101
102 fn try_from_logical_plan(
103 plan: &LogicalPlan,
104 extension_codec: &dyn LogicalExtensionCodec,
105 ) -> Result<Self>
106 where
107 Self: Sized;
108}
109
110pub trait LogicalExtensionCodec: Debug + Send + Sync {
111 fn try_decode(
112 &self,
113 buf: &[u8],
114 inputs: &[LogicalPlan],
115 ctx: &SessionContext,
116 ) -> Result<Extension>;
117
118 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
119
120 fn try_decode_table_provider(
121 &self,
122 buf: &[u8],
123 table_ref: &TableReference,
124 schema: SchemaRef,
125 ctx: &SessionContext,
126 ) -> Result<Arc<dyn TableProvider>>;
127
128 fn try_encode_table_provider(
129 &self,
130 table_ref: &TableReference,
131 node: Arc<dyn TableProvider>,
132 buf: &mut Vec<u8>,
133 ) -> Result<()>;
134
135 fn try_decode_file_format(
136 &self,
137 _buf: &[u8],
138 _ctx: &SessionContext,
139 ) -> Result<Arc<dyn FileFormatFactory>> {
140 not_impl_err!("LogicalExtensionCodec is not provided for file format")
141 }
142
143 fn try_encode_file_format(
144 &self,
145 _buf: &mut Vec<u8>,
146 _node: Arc<dyn FileFormatFactory>,
147 ) -> Result<()> {
148 Ok(())
149 }
150
151 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
152 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
153 }
154
155 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
156 Ok(())
157 }
158
159 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
160 not_impl_err!(
161 "LogicalExtensionCodec is not provided for aggregate function {name}"
162 )
163 }
164
165 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
166 Ok(())
167 }
168
169 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
170 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
171 }
172
173 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
174 Ok(())
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct DefaultLogicalExtensionCodec {}
180
181impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
182 fn try_decode(
183 &self,
184 _buf: &[u8],
185 _inputs: &[LogicalPlan],
186 _ctx: &SessionContext,
187 ) -> Result<Extension> {
188 not_impl_err!("LogicalExtensionCodec is not provided")
189 }
190
191 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
192 not_impl_err!("LogicalExtensionCodec is not provided")
193 }
194
195 fn try_decode_table_provider(
196 &self,
197 _buf: &[u8],
198 _table_ref: &TableReference,
199 _schema: SchemaRef,
200 _ctx: &SessionContext,
201 ) -> Result<Arc<dyn TableProvider>> {
202 not_impl_err!("LogicalExtensionCodec is not provided")
203 }
204
205 fn try_encode_table_provider(
206 &self,
207 _table_ref: &TableReference,
208 _node: Arc<dyn TableProvider>,
209 _buf: &mut Vec<u8>,
210 ) -> Result<()> {
211 not_impl_err!("LogicalExtensionCodec is not provided")
212 }
213}
214
215#[macro_export]
216macro_rules! into_logical_plan {
217 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
218 if let Some(field) = $PB.as_ref() {
219 field.as_ref().try_into_logical_plan($CTX, $CODEC)
220 } else {
221 Err(proto_error("Missing required field in protobuf"))
222 }
223 }};
224}
225
226fn from_table_reference(
227 table_ref: Option<&protobuf::TableReference>,
228 error_context: &str,
229) -> Result<TableReference> {
230 let table_ref = table_ref.ok_or_else(|| {
231 DataFusionError::Internal(format!(
232 "Protobuf deserialization error, {error_context} was missing required field name."
233 ))
234 })?;
235
236 Ok(table_ref.clone().try_into()?)
237}
238
239fn to_table_source(
243 node: &Option<Box<LogicalPlanNode>>,
244 ctx: &SessionContext,
245 extension_codec: &dyn LogicalExtensionCodec,
246) -> Result<Arc<dyn TableSource>> {
247 if let Some(node) = node {
248 match node.try_into_logical_plan(ctx, extension_codec)? {
249 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
250 _ => plan_err!("expected TableScan node"),
251 }
252 } else {
253 plan_err!("LogicalPlanNode should be provided")
254 }
255}
256
257fn from_table_source(
261 table_name: TableReference,
262 target: Arc<dyn TableSource>,
263 extension_codec: &dyn LogicalExtensionCodec,
264) -> Result<LogicalPlanNode> {
265 let projected_schema = target.schema().to_dfschema_ref()?;
266 let r = LogicalPlan::TableScan(TableScan {
267 table_name,
268 source: target,
269 projection: None,
270 projected_schema,
271 filters: vec![],
272 fetch: None,
273 });
274
275 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
276}
277
278impl AsLogicalPlan for LogicalPlanNode {
279 fn try_decode(buf: &[u8]) -> Result<Self>
280 where
281 Self: Sized,
282 {
283 LogicalPlanNode::decode(buf).map_err(|e| {
284 DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
285 })
286 }
287
288 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
289 where
290 B: BufMut,
291 Self: Sized,
292 {
293 self.encode(buf).map_err(|e| {
294 DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
295 })
296 }
297
298 fn try_into_logical_plan(
299 &self,
300 ctx: &SessionContext,
301 extension_codec: &dyn LogicalExtensionCodec,
302 ) -> Result<LogicalPlan> {
303 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
304 proto_error(format!(
305 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
306 ))
307 })?;
308 match plan {
309 LogicalPlanType::Values(values) => {
310 let n_cols = values.n_cols as usize;
311 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
312 Ok(Vec::new())
313 } else if values.values_list.len() % n_cols != 0 {
314 internal_err!(
315 "Invalid values list length, expect {} to be divisible by {}",
316 values.values_list.len(),
317 n_cols
318 )
319 } else {
320 values
321 .values_list
322 .chunks_exact(n_cols)
323 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
324 .collect::<Result<Vec<_>, _>>()
325 .map_err(|e| e.into())
326 }?;
327
328 LogicalPlanBuilder::values(values)?.build()
329 }
330 LogicalPlanType::Projection(projection) => {
331 let input: LogicalPlan =
332 into_logical_plan!(projection.input, ctx, extension_codec)?;
333 let expr: Vec<Expr> =
334 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
335
336 let new_proj = project(input, expr)?;
337 match projection.optional_alias.as_ref() {
338 Some(a) => match a {
339 protobuf::projection_node::OptionalAlias::Alias(alias) => {
340 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
341 Arc::new(new_proj),
342 alias.clone(),
343 )?))
344 }
345 },
346 _ => Ok(new_proj),
347 }
348 }
349 LogicalPlanType::Selection(selection) => {
350 let input: LogicalPlan =
351 into_logical_plan!(selection.input, ctx, extension_codec)?;
352 let expr: Expr = selection
353 .expr
354 .as_ref()
355 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
356 .transpose()?
357 .ok_or_else(|| {
358 DataFusionError::Internal("expression required".to_string())
359 })?;
360 LogicalPlanBuilder::from(input).filter(expr)?.build()
362 }
363 LogicalPlanType::Window(window) => {
364 let input: LogicalPlan =
365 into_logical_plan!(window.input, ctx, extension_codec)?;
366 let window_expr =
367 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
368 LogicalPlanBuilder::from(input).window(window_expr)?.build()
369 }
370 LogicalPlanType::Aggregate(aggregate) => {
371 let input: LogicalPlan =
372 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
373 let group_expr =
374 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
375 let aggr_expr =
376 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
377 LogicalPlanBuilder::from(input)
378 .aggregate(group_expr, aggr_expr)?
379 .build()
380 }
381 LogicalPlanType::ListingScan(scan) => {
382 let schema: Schema = convert_required!(scan.schema)?;
383
384 let mut projection = None;
385 if let Some(columns) = &scan.projection {
386 let column_indices = columns
387 .columns
388 .iter()
389 .map(|name| schema.index_of(name))
390 .collect::<Result<Vec<usize>, _>>()?;
391 projection = Some(column_indices);
392 }
393
394 let filters =
395 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
396
397 let mut all_sort_orders = vec![];
398 for order in &scan.file_sort_order {
399 all_sort_orders.push(from_proto::parse_sorts(
400 &order.sort_expr_nodes,
401 ctx,
402 extension_codec,
403 )?)
404 }
405
406 let file_format: Arc<dyn FileFormat> =
407 match scan.file_format_type.as_ref().ok_or_else(|| {
408 proto_error(format!(
409 "logical_plan::from_proto() Unsupported file format '{self:?}'"
410 ))
411 })? {
412 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
413 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
414 #[cfg(feature = "parquet")]
415 {
416 let mut parquet = ParquetFormat::default();
417 if let Some(options) = options {
418 parquet = parquet.with_options(options.try_into()?)
419 }
420 Arc::new(parquet)
421 }
422 #[cfg(not(feature = "parquet"))]
423 panic!("Unable to process parquet file since `parquet` feature is not enabled");
424 }
425 FileFormatType::Csv(protobuf::CsvFormat {
426 options
427 }) => {
428 let mut csv = CsvFormat::default();
429 if let Some(options) = options {
430 csv = csv.with_options(options.try_into()?)
431 }
432 Arc::new(csv)
433 },
434 FileFormatType::Json(protobuf::NdJsonFormat {
435 options
436 }) => {
437 let mut json = OtherNdJsonFormat::default();
438 if let Some(options) = options {
439 json = json.with_options(options.try_into()?)
440 }
441 Arc::new(json)
442 }
443 FileFormatType::Avro(..) => Arc::new(AvroFormat),
444 };
445
446 let table_paths = &scan
447 .paths
448 .iter()
449 .map(ListingTableUrl::parse)
450 .collect::<Result<Vec<_>, _>>()?;
451
452 let options = ListingOptions::new(file_format)
453 .with_file_extension(&scan.file_extension)
454 .with_table_partition_cols(
455 scan.table_partition_cols
456 .iter()
457 .map(|col| {
458 (
459 col.clone(),
460 schema
461 .field_with_name(col)
462 .unwrap()
463 .data_type()
464 .clone(),
465 )
466 })
467 .collect(),
468 )
469 .with_collect_stat(scan.collect_stat)
470 .with_target_partitions(scan.target_partitions as usize)
471 .with_file_sort_order(all_sort_orders);
472
473 let config =
474 ListingTableConfig::new_with_multi_paths(table_paths.clone())
475 .with_listing_options(options)
476 .with_schema(Arc::new(schema));
477
478 let provider = ListingTable::try_new(config)?.with_cache(
479 ctx.state()
480 .runtime_env()
481 .cache_manager
482 .get_file_statistic_cache(),
483 );
484
485 let table_name =
486 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
487
488 LogicalPlanBuilder::scan_with_filters(
489 table_name,
490 provider_as_source(Arc::new(provider)),
491 projection,
492 filters,
493 )?
494 .build()
495 }
496 LogicalPlanType::CustomScan(scan) => {
497 let schema: Schema = convert_required!(scan.schema)?;
498 let schema = Arc::new(schema);
499 let mut projection = None;
500 if let Some(columns) = &scan.projection {
501 let column_indices = columns
502 .columns
503 .iter()
504 .map(|name| schema.index_of(name))
505 .collect::<Result<Vec<usize>, _>>()?;
506 projection = Some(column_indices);
507 }
508
509 let filters =
510 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
511
512 let table_name =
513 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
514
515 let provider = extension_codec.try_decode_table_provider(
516 &scan.custom_table_data,
517 &table_name,
518 schema,
519 ctx,
520 )?;
521
522 LogicalPlanBuilder::scan_with_filters(
523 table_name,
524 provider_as_source(provider),
525 projection,
526 filters,
527 )?
528 .build()
529 }
530 LogicalPlanType::Sort(sort) => {
531 let input: LogicalPlan =
532 into_logical_plan!(sort.input, ctx, extension_codec)?;
533 let sort_expr: Vec<SortExpr> =
534 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
535 let fetch: Option<usize> = sort.fetch.try_into().ok();
536 LogicalPlanBuilder::from(input)
537 .sort_with_limit(sort_expr, fetch)?
538 .build()
539 }
540 LogicalPlanType::Repartition(repartition) => {
541 use datafusion::logical_expr::Partitioning;
542 let input: LogicalPlan =
543 into_logical_plan!(repartition.input, ctx, extension_codec)?;
544 use protobuf::repartition_node::PartitionMethod;
545 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
546 internal_datafusion_err!(
547 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
548 )
549 })?;
550
551 let partitioning_scheme = match pb_partition_method {
552 PartitionMethod::Hash(protobuf::HashRepartition {
553 hash_expr: pb_hash_expr,
554 partition_count,
555 }) => Partitioning::Hash(
556 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
557 *partition_count as usize,
558 ),
559 PartitionMethod::RoundRobin(partition_count) => {
560 Partitioning::RoundRobinBatch(*partition_count as usize)
561 }
562 };
563
564 LogicalPlanBuilder::from(input)
565 .repartition(partitioning_scheme)?
566 .build()
567 }
568 LogicalPlanType::EmptyRelation(empty_relation) => {
569 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
570 }
571 LogicalPlanType::CreateExternalTable(create_extern_table) => {
572 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
573 DataFusionError::Internal(String::from(
574 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
575 ))
576 })?;
577
578 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
579 DataFusionError::Internal(String::from(
580 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
581 ))
582 })?;
583 let definition = if !create_extern_table.definition.is_empty() {
584 Some(create_extern_table.definition.clone())
585 } else {
586 None
587 };
588
589 let file_type = create_extern_table.file_type.as_str();
590 if ctx.table_factory(file_type).is_none() {
591 internal_err!("No TableProviderFactory for file type: {file_type}")?
592 }
593
594 let mut order_exprs = vec![];
595 for expr in &create_extern_table.order_exprs {
596 order_exprs.push(from_proto::parse_sorts(
597 &expr.sort_expr_nodes,
598 ctx,
599 extension_codec,
600 )?);
601 }
602
603 let mut column_defaults =
604 HashMap::with_capacity(create_extern_table.column_defaults.len());
605 for (col_name, expr) in &create_extern_table.column_defaults {
606 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
607 column_defaults.insert(col_name.clone(), expr);
608 }
609
610 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
611 CreateExternalTable {
612 schema: pb_schema.try_into()?,
613 name: from_table_reference(
614 create_extern_table.name.as_ref(),
615 "CreateExternalTable",
616 )?,
617 location: create_extern_table.location.clone(),
618 file_type: create_extern_table.file_type.clone(),
619 table_partition_cols: create_extern_table
620 .table_partition_cols
621 .clone(),
622 order_exprs,
623 if_not_exists: create_extern_table.if_not_exists,
624 temporary: create_extern_table.temporary,
625 definition,
626 unbounded: create_extern_table.unbounded,
627 options: create_extern_table.options.clone(),
628 constraints: constraints.into(),
629 column_defaults,
630 },
631 )))
632 }
633 LogicalPlanType::CreateView(create_view) => {
634 let plan = create_view
635 .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
636 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
637 )))?
638 .try_into_logical_plan(ctx, extension_codec)?;
639 let definition = if !create_view.definition.is_empty() {
640 Some(create_view.definition.clone())
641 } else {
642 None
643 };
644
645 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
647 temporary: create_view.temporary,
648 input: Arc::new(plan),
649 or_replace: create_view.or_replace,
650 definition,
651 })))
652 }
653 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
654 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
655 DataFusionError::Internal(String::from(
656 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
657 ))
658 })?;
659
660 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
661 CreateCatalogSchema {
662 schema_name: create_catalog_schema.schema_name.clone(),
663 if_not_exists: create_catalog_schema.if_not_exists,
664 schema: pb_schema.try_into()?,
665 },
666 )))
667 }
668 LogicalPlanType::CreateCatalog(create_catalog) => {
669 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
670 DataFusionError::Internal(String::from(
671 "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
672 ))
673 })?;
674
675 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676 CreateCatalog {
677 catalog_name: create_catalog.catalog_name.clone(),
678 if_not_exists: create_catalog.if_not_exists,
679 schema: pb_schema.try_into()?,
680 },
681 )))
682 }
683 LogicalPlanType::Analyze(analyze) => {
684 let input: LogicalPlan =
685 into_logical_plan!(analyze.input, ctx, extension_codec)?;
686 LogicalPlanBuilder::from(input)
687 .explain(analyze.verbose, true)?
688 .build()
689 }
690 LogicalPlanType::Explain(explain) => {
691 let input: LogicalPlan =
692 into_logical_plan!(explain.input, ctx, extension_codec)?;
693 LogicalPlanBuilder::from(input)
694 .explain(explain.verbose, false)?
695 .build()
696 }
697 LogicalPlanType::SubqueryAlias(aliased_relation) => {
698 let input: LogicalPlan =
699 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
700 let alias = from_table_reference(
701 aliased_relation.alias.as_ref(),
702 "SubqueryAlias",
703 )?;
704 LogicalPlanBuilder::from(input).alias(alias)?.build()
705 }
706 LogicalPlanType::Limit(limit) => {
707 let input: LogicalPlan =
708 into_logical_plan!(limit.input, ctx, extension_codec)?;
709 let skip = limit.skip.max(0) as usize;
710
711 let fetch = if limit.fetch < 0 {
712 None
713 } else {
714 Some(limit.fetch as usize)
715 };
716
717 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
718 }
719 LogicalPlanType::Join(join) => {
720 let left_keys: Vec<Expr> =
721 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
722 let right_keys: Vec<Expr> =
723 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
724 let join_type =
725 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
726 proto_error(format!(
727 "Received a JoinNode message with unknown JoinType {}",
728 join.join_type
729 ))
730 })?;
731 let join_constraint = protobuf::JoinConstraint::try_from(
732 join.join_constraint,
733 )
734 .map_err(|_| {
735 proto_error(format!(
736 "Received a JoinNode message with unknown JoinConstraint {}",
737 join.join_constraint
738 ))
739 })?;
740 let filter: Option<Expr> = join
741 .filter
742 .as_ref()
743 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
744 .map_or(Ok(None), |v| v.map(Some))?;
745
746 let builder = LogicalPlanBuilder::from(into_logical_plan!(
747 join.left,
748 ctx,
749 extension_codec
750 )?);
751 let builder = match join_constraint.into() {
752 JoinConstraint::On => builder.join_with_expr_keys(
753 into_logical_plan!(join.right, ctx, extension_codec)?,
754 join_type.into(),
755 (left_keys, right_keys),
756 filter,
757 )?,
758 JoinConstraint::Using => {
759 let using_keys = left_keys
761 .into_iter()
762 .map(|key| {
763 key.try_as_col().cloned()
764 .ok_or_else(|| internal_datafusion_err!(
765 "Using join keys must be column references, got: {key:?}"
766 ))
767 })
768 .collect::<Result<Vec<_>, _>>()?;
769 builder.join_using(
770 into_logical_plan!(join.right, ctx, extension_codec)?,
771 join_type.into(),
772 using_keys,
773 )?
774 }
775 };
776
777 builder.build()
778 }
779 LogicalPlanType::Union(union) => {
780 if union.inputs.len() < 2 {
781 return Err( DataFusionError::Internal(String::from(
782 "Protobuf deserialization error, Union was require at least two input.",
783 )));
784 }
785 let (first, rest) = union.inputs.split_first().unwrap();
786 let mut builder = LogicalPlanBuilder::from(
787 first.try_into_logical_plan(ctx, extension_codec)?,
788 );
789
790 for i in rest {
791 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
792 builder = builder.union(plan)?;
793 }
794 builder.build()
795 }
796 LogicalPlanType::CrossJoin(crossjoin) => {
797 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
798 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
799
800 LogicalPlanBuilder::from(left).cross_join(right)?.build()
801 }
802 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
803 let input_plans: Vec<LogicalPlan> = inputs
804 .iter()
805 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
806 .collect::<Result<_>>()?;
807
808 let extension_node =
809 extension_codec.try_decode(node, &input_plans, ctx)?;
810 Ok(LogicalPlan::Extension(extension_node))
811 }
812 LogicalPlanType::Distinct(distinct) => {
813 let input: LogicalPlan =
814 into_logical_plan!(distinct.input, ctx, extension_codec)?;
815 LogicalPlanBuilder::from(input).distinct()?.build()
816 }
817 LogicalPlanType::DistinctOn(distinct_on) => {
818 let input: LogicalPlan =
819 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
820 let on_expr =
821 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
822 let select_expr = from_proto::parse_exprs(
823 &distinct_on.select_expr,
824 ctx,
825 extension_codec,
826 )?;
827 let sort_expr = match distinct_on.sort_expr.len() {
828 0 => None,
829 _ => Some(from_proto::parse_sorts(
830 &distinct_on.sort_expr,
831 ctx,
832 extension_codec,
833 )?),
834 };
835 LogicalPlanBuilder::from(input)
836 .distinct_on(on_expr, select_expr, sort_expr)?
837 .build()
838 }
839 LogicalPlanType::ViewScan(scan) => {
840 let schema: Schema = convert_required!(scan.schema)?;
841
842 let mut projection = None;
843 if let Some(columns) = &scan.projection {
844 let column_indices = columns
845 .columns
846 .iter()
847 .map(|name| schema.index_of(name))
848 .collect::<Result<Vec<usize>, _>>()?;
849 projection = Some(column_indices);
850 }
851
852 let input: LogicalPlan =
853 into_logical_plan!(scan.input, ctx, extension_codec)?;
854
855 let definition = if !scan.definition.is_empty() {
856 Some(scan.definition.clone())
857 } else {
858 None
859 };
860
861 let provider = ViewTable::try_new(input, definition)?;
862
863 let table_name =
864 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
865
866 LogicalPlanBuilder::scan(
867 table_name,
868 provider_as_source(Arc::new(provider)),
869 projection,
870 )?
871 .build()
872 }
873 LogicalPlanType::Prepare(prepare) => {
874 let input: LogicalPlan =
875 into_logical_plan!(prepare.input, ctx, extension_codec)?;
876 let data_types: Vec<DataType> = prepare
877 .data_types
878 .iter()
879 .map(DataType::try_from)
880 .collect::<Result<_, _>>()?;
881 LogicalPlanBuilder::from(input)
882 .prepare(prepare.name.clone(), data_types)?
883 .build()
884 }
885 LogicalPlanType::DropView(dropview) => {
886 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
887 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
888 if_exists: dropview.if_exists,
889 schema: Arc::new(convert_required!(dropview.schema)?),
890 })))
891 }
892 LogicalPlanType::CopyTo(copy) => {
893 let input: LogicalPlan =
894 into_logical_plan!(copy.input, ctx, extension_codec)?;
895
896 let file_type: Arc<dyn FileType> = format_as_file_type(
897 extension_codec.try_decode_file_format(©.file_type, ctx)?,
898 );
899
900 Ok(LogicalPlan::Copy(dml::CopyTo {
901 input: Arc::new(input),
902 output_url: copy.output_url.clone(),
903 partition_by: copy.partition_by.clone(),
904 file_type,
905 options: Default::default(),
906 }))
907 }
908 LogicalPlanType::Unnest(unnest) => {
909 let input: LogicalPlan =
910 into_logical_plan!(unnest.input, ctx, extension_codec)?;
911 Ok(LogicalPlan::Unnest(Unnest {
912 input: Arc::new(input),
913 exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
914 list_type_columns: unnest
915 .list_type_columns
916 .iter()
917 .map(|c| {
918 let recursion_item = c.recursion.as_ref().unwrap();
919 (
920 c.input_index as _,
921 ColumnUnnestList {
922 output_column: recursion_item
923 .output_column
924 .as_ref()
925 .unwrap()
926 .into(),
927 depth: recursion_item.depth as _,
928 },
929 )
930 })
931 .collect(),
932 struct_type_columns: unnest
933 .struct_type_columns
934 .iter()
935 .map(|c| *c as usize)
936 .collect(),
937 dependency_indices: unnest
938 .dependency_indices
939 .iter()
940 .map(|c| *c as usize)
941 .collect(),
942 schema: Arc::new(convert_required!(unnest.schema)?),
943 options: into_required!(unnest.options)?,
944 }))
945 }
946 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
947 let static_term = recursive_query_node
948 .static_term
949 .as_ref()
950 .ok_or_else(|| DataFusionError::Internal(String::from(
951 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
952 )))?
953 .try_into_logical_plan(ctx, extension_codec)?;
954
955 let recursive_term = recursive_query_node
956 .recursive_term
957 .as_ref()
958 .ok_or_else(|| DataFusionError::Internal(String::from(
959 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
960 )))?
961 .try_into_logical_plan(ctx, extension_codec)?;
962
963 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
964 name: recursive_query_node.name.clone(),
965 static_term: Arc::new(static_term),
966 recursive_term: Arc::new(recursive_term),
967 is_distinct: recursive_query_node.is_distinct,
968 }))
969 }
970 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
971 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
972 let schema = convert_required!(*schema)?;
973 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
974 LogicalPlanBuilder::scan(
975 name.as_str(),
976 provider_as_source(Arc::new(cte_work_table)),
977 None,
978 )?
979 .build()
980 }
981 LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
982 datafusion::logical_expr::DmlStatement::new(
983 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
984 to_table_source(&dml_node.target, ctx, extension_codec)?,
985 dml_node.dml_type().into(),
986 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
987 ),
988 )),
989 }
990 }
991
992 fn try_from_logical_plan(
993 plan: &LogicalPlan,
994 extension_codec: &dyn LogicalExtensionCodec,
995 ) -> Result<Self>
996 where
997 Self: Sized,
998 {
999 match plan {
1000 LogicalPlan::Values(Values { values, .. }) => {
1001 let n_cols = if values.is_empty() {
1002 0
1003 } else {
1004 values[0].len()
1005 } as u64;
1006 let values_list =
1007 serialize_exprs(values.iter().flatten(), extension_codec)?;
1008 Ok(LogicalPlanNode {
1009 logical_plan_type: Some(LogicalPlanType::Values(
1010 protobuf::ValuesNode {
1011 n_cols,
1012 values_list,
1013 },
1014 )),
1015 })
1016 }
1017 LogicalPlan::TableScan(TableScan {
1018 table_name,
1019 source,
1020 filters,
1021 projection,
1022 ..
1023 }) => {
1024 let provider = source_as_provider(source)?;
1025 let schema = provider.schema();
1026 let source = provider.as_any();
1027
1028 let projection = match projection {
1029 None => None,
1030 Some(columns) => {
1031 let column_names = columns
1032 .iter()
1033 .map(|i| schema.field(*i).name().to_owned())
1034 .collect();
1035 Some(protobuf::ProjectionColumns {
1036 columns: column_names,
1037 })
1038 }
1039 };
1040 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1041
1042 let filters: Vec<protobuf::LogicalExprNode> =
1043 serialize_exprs(filters, extension_codec)?;
1044
1045 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1046 let any = listing_table.options().format.as_any();
1047 let file_format_type = {
1048 let mut maybe_some_type = None;
1049
1050 #[cfg(feature = "parquet")]
1051 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1052 let options = parquet.options();
1053 maybe_some_type =
1054 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1055 options: Some(options.try_into()?),
1056 }));
1057 };
1058
1059 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1060 let options = csv.options();
1061 maybe_some_type =
1062 Some(FileFormatType::Csv(protobuf::CsvFormat {
1063 options: Some(options.try_into()?),
1064 }));
1065 }
1066
1067 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1068 let options = json.options();
1069 maybe_some_type =
1070 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1071 options: Some(options.try_into()?),
1072 }))
1073 }
1074
1075 if any.is::<AvroFormat>() {
1076 maybe_some_type =
1077 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1078 }
1079
1080 if let Some(file_format_type) = maybe_some_type {
1081 file_format_type
1082 } else {
1083 return Err(proto_error(format!(
1084 "Error converting file format, {:?} is invalid as a datafusion format.",
1085 listing_table.options().format
1086 )));
1087 }
1088 };
1089
1090 let options = listing_table.options();
1091
1092 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1093 for order in &options.file_sort_order {
1094 let expr_vec = SortExprNodeCollection {
1095 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1096 };
1097 exprs_vec.push(expr_vec);
1098 }
1099
1100 Ok(LogicalPlanNode {
1101 logical_plan_type: Some(LogicalPlanType::ListingScan(
1102 protobuf::ListingTableScanNode {
1103 file_format_type: Some(file_format_type),
1104 table_name: Some(table_name.clone().into()),
1105 collect_stat: options.collect_stat,
1106 file_extension: options.file_extension.clone(),
1107 table_partition_cols: options
1108 .table_partition_cols
1109 .iter()
1110 .map(|x| x.0.clone())
1111 .collect::<Vec<_>>(),
1112 paths: listing_table
1113 .table_paths()
1114 .iter()
1115 .map(|x| x.to_string())
1116 .collect(),
1117 schema: Some(schema),
1118 projection,
1119 filters,
1120 target_partitions: options.target_partitions as u32,
1121 file_sort_order: exprs_vec,
1122 },
1123 )),
1124 })
1125 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1126 Ok(LogicalPlanNode {
1127 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1128 protobuf::ViewTableScanNode {
1129 table_name: Some(table_name.clone().into()),
1130 input: Some(Box::new(
1131 LogicalPlanNode::try_from_logical_plan(
1132 view_table.logical_plan(),
1133 extension_codec,
1134 )?,
1135 )),
1136 schema: Some(schema),
1137 projection,
1138 definition: view_table
1139 .definition()
1140 .map(|s| s.to_string())
1141 .unwrap_or_default(),
1142 },
1143 ))),
1144 })
1145 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1146 {
1147 let name = cte_work_table.name().to_string();
1148 let schema = cte_work_table.schema();
1149 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1150
1151 Ok(LogicalPlanNode {
1152 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1153 protobuf::CteWorkTableScanNode {
1154 name,
1155 schema: Some(schema),
1156 },
1157 )),
1158 })
1159 } else {
1160 let mut bytes = vec![];
1161 extension_codec
1162 .try_encode_table_provider(table_name, provider, &mut bytes)
1163 .map_err(|e| context!("Error serializing custom table", e))?;
1164 let scan = CustomScan(CustomTableScanNode {
1165 table_name: Some(table_name.clone().into()),
1166 projection,
1167 schema: Some(schema),
1168 filters,
1169 custom_table_data: bytes,
1170 });
1171 let node = LogicalPlanNode {
1172 logical_plan_type: Some(scan),
1173 };
1174 Ok(node)
1175 }
1176 }
1177 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1178 Ok(LogicalPlanNode {
1179 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1180 protobuf::ProjectionNode {
1181 input: Some(Box::new(
1182 LogicalPlanNode::try_from_logical_plan(
1183 input.as_ref(),
1184 extension_codec,
1185 )?,
1186 )),
1187 expr: serialize_exprs(expr, extension_codec)?,
1188 optional_alias: None,
1189 },
1190 ))),
1191 })
1192 }
1193 LogicalPlan::Filter(filter) => {
1194 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1195 filter.input.as_ref(),
1196 extension_codec,
1197 )?;
1198 Ok(LogicalPlanNode {
1199 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1200 protobuf::SelectionNode {
1201 input: Some(Box::new(input)),
1202 expr: Some(serialize_expr(
1203 &filter.predicate,
1204 extension_codec,
1205 )?),
1206 },
1207 ))),
1208 })
1209 }
1210 LogicalPlan::Distinct(Distinct::All(input)) => {
1211 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1212 input.as_ref(),
1213 extension_codec,
1214 )?;
1215 Ok(LogicalPlanNode {
1216 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1217 protobuf::DistinctNode {
1218 input: Some(Box::new(input)),
1219 },
1220 ))),
1221 })
1222 }
1223 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1224 on_expr,
1225 select_expr,
1226 sort_expr,
1227 input,
1228 ..
1229 })) => {
1230 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1231 input.as_ref(),
1232 extension_codec,
1233 )?;
1234 let sort_expr = match sort_expr {
1235 None => vec![],
1236 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1237 };
1238 Ok(LogicalPlanNode {
1239 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1240 protobuf::DistinctOnNode {
1241 on_expr: serialize_exprs(on_expr, extension_codec)?,
1242 select_expr: serialize_exprs(select_expr, extension_codec)?,
1243 sort_expr,
1244 input: Some(Box::new(input)),
1245 },
1246 ))),
1247 })
1248 }
1249 LogicalPlan::Window(Window {
1250 input, window_expr, ..
1251 }) => {
1252 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1253 input.as_ref(),
1254 extension_codec,
1255 )?;
1256 Ok(LogicalPlanNode {
1257 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1258 protobuf::WindowNode {
1259 input: Some(Box::new(input)),
1260 window_expr: serialize_exprs(window_expr, extension_codec)?,
1261 },
1262 ))),
1263 })
1264 }
1265 LogicalPlan::Aggregate(Aggregate {
1266 group_expr,
1267 aggr_expr,
1268 input,
1269 ..
1270 }) => {
1271 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1272 input.as_ref(),
1273 extension_codec,
1274 )?;
1275 Ok(LogicalPlanNode {
1276 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1277 protobuf::AggregateNode {
1278 input: Some(Box::new(input)),
1279 group_expr: serialize_exprs(group_expr, extension_codec)?,
1280 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1281 },
1282 ))),
1283 })
1284 }
1285 LogicalPlan::Join(Join {
1286 left,
1287 right,
1288 on,
1289 filter,
1290 join_type,
1291 join_constraint,
1292 null_equals_null,
1293 ..
1294 }) => {
1295 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1296 left.as_ref(),
1297 extension_codec,
1298 )?;
1299 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1300 right.as_ref(),
1301 extension_codec,
1302 )?;
1303 let (left_join_key, right_join_key) = on
1304 .iter()
1305 .map(|(l, r)| {
1306 Ok((
1307 serialize_expr(l, extension_codec)?,
1308 serialize_expr(r, extension_codec)?,
1309 ))
1310 })
1311 .collect::<Result<Vec<_>, ToProtoError>>()?
1312 .into_iter()
1313 .unzip();
1314 let join_type: protobuf::JoinType = join_type.to_owned().into();
1315 let join_constraint: protobuf::JoinConstraint =
1316 join_constraint.to_owned().into();
1317 let filter = filter
1318 .as_ref()
1319 .map(|e| serialize_expr(e, extension_codec))
1320 .map_or(Ok(None), |v| v.map(Some))?;
1321 Ok(LogicalPlanNode {
1322 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1323 protobuf::JoinNode {
1324 left: Some(Box::new(left)),
1325 right: Some(Box::new(right)),
1326 join_type: join_type.into(),
1327 join_constraint: join_constraint.into(),
1328 left_join_key,
1329 right_join_key,
1330 null_equals_null: *null_equals_null,
1331 filter,
1332 },
1333 ))),
1334 })
1335 }
1336 LogicalPlan::Subquery(_) => {
1337 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1338 }
1339 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1340 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1341 input.as_ref(),
1342 extension_codec,
1343 )?;
1344 Ok(LogicalPlanNode {
1345 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1346 protobuf::SubqueryAliasNode {
1347 input: Some(Box::new(input)),
1348 alias: Some((*alias).clone().into()),
1349 },
1350 ))),
1351 })
1352 }
1353 LogicalPlan::Limit(limit) => {
1354 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1355 limit.input.as_ref(),
1356 extension_codec,
1357 )?;
1358 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1359 return Err(proto_error(
1360 "LogicalPlan::Limit only supports literal skip values",
1361 ));
1362 };
1363 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1364 return Err(proto_error(
1365 "LogicalPlan::Limit only supports literal fetch values",
1366 ));
1367 };
1368
1369 Ok(LogicalPlanNode {
1370 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1371 protobuf::LimitNode {
1372 input: Some(Box::new(input)),
1373 skip: skip as i64,
1374 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1375 },
1376 ))),
1377 })
1378 }
1379 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1380 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1381 input.as_ref(),
1382 extension_codec,
1383 )?;
1384 let sort_expr: Vec<protobuf::SortExprNode> =
1385 serialize_sorts(expr, extension_codec)?;
1386 Ok(LogicalPlanNode {
1387 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1388 protobuf::SortNode {
1389 input: Some(Box::new(input)),
1390 expr: sort_expr,
1391 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1392 },
1393 ))),
1394 })
1395 }
1396 LogicalPlan::Repartition(Repartition {
1397 input,
1398 partitioning_scheme,
1399 }) => {
1400 use datafusion::logical_expr::Partitioning;
1401 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1402 input.as_ref(),
1403 extension_codec,
1404 )?;
1405
1406 use protobuf::repartition_node::PartitionMethod;
1409
1410 let pb_partition_method = match partitioning_scheme {
1411 Partitioning::Hash(exprs, partition_count) => {
1412 PartitionMethod::Hash(protobuf::HashRepartition {
1413 hash_expr: serialize_exprs(exprs, extension_codec)?,
1414 partition_count: *partition_count as u64,
1415 })
1416 }
1417 Partitioning::RoundRobinBatch(partition_count) => {
1418 PartitionMethod::RoundRobin(*partition_count as u64)
1419 }
1420 Partitioning::DistributeBy(_) => {
1421 return not_impl_err!("DistributeBy")
1422 }
1423 };
1424
1425 Ok(LogicalPlanNode {
1426 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1427 protobuf::RepartitionNode {
1428 input: Some(Box::new(input)),
1429 partition_method: Some(pb_partition_method),
1430 },
1431 ))),
1432 })
1433 }
1434 LogicalPlan::EmptyRelation(EmptyRelation {
1435 produce_one_row, ..
1436 }) => Ok(LogicalPlanNode {
1437 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1438 protobuf::EmptyRelationNode {
1439 produce_one_row: *produce_one_row,
1440 },
1441 )),
1442 }),
1443 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1444 CreateExternalTable {
1445 name,
1446 location,
1447 file_type,
1448 schema: df_schema,
1449 table_partition_cols,
1450 if_not_exists,
1451 definition,
1452 order_exprs,
1453 unbounded,
1454 options,
1455 constraints,
1456 column_defaults,
1457 temporary,
1458 },
1459 )) => {
1460 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1461 for order in order_exprs {
1462 let temp = SortExprNodeCollection {
1463 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1464 };
1465 converted_order_exprs.push(temp);
1466 }
1467
1468 let mut converted_column_defaults =
1469 HashMap::with_capacity(column_defaults.len());
1470 for (col_name, expr) in column_defaults {
1471 converted_column_defaults
1472 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1473 }
1474
1475 Ok(LogicalPlanNode {
1476 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1477 protobuf::CreateExternalTableNode {
1478 name: Some(name.clone().into()),
1479 location: location.clone(),
1480 file_type: file_type.clone(),
1481 schema: Some(df_schema.try_into()?),
1482 table_partition_cols: table_partition_cols.clone(),
1483 if_not_exists: *if_not_exists,
1484 temporary: *temporary,
1485 order_exprs: converted_order_exprs,
1486 definition: definition.clone().unwrap_or_default(),
1487 unbounded: *unbounded,
1488 options: options.clone(),
1489 constraints: Some(constraints.clone().into()),
1490 column_defaults: converted_column_defaults,
1491 },
1492 )),
1493 })
1494 }
1495 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1496 name,
1497 input,
1498 or_replace,
1499 definition,
1500 temporary,
1501 })) => Ok(LogicalPlanNode {
1502 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1503 protobuf::CreateViewNode {
1504 name: Some(name.clone().into()),
1505 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1506 input,
1507 extension_codec,
1508 )?)),
1509 or_replace: *or_replace,
1510 temporary: *temporary,
1511 definition: definition.clone().unwrap_or_default(),
1512 },
1513 ))),
1514 }),
1515 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1516 CreateCatalogSchema {
1517 schema_name,
1518 if_not_exists,
1519 schema: df_schema,
1520 },
1521 )) => Ok(LogicalPlanNode {
1522 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1523 protobuf::CreateCatalogSchemaNode {
1524 schema_name: schema_name.clone(),
1525 if_not_exists: *if_not_exists,
1526 schema: Some(df_schema.try_into()?),
1527 },
1528 )),
1529 }),
1530 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1531 catalog_name,
1532 if_not_exists,
1533 schema: df_schema,
1534 })) => Ok(LogicalPlanNode {
1535 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1536 protobuf::CreateCatalogNode {
1537 catalog_name: catalog_name.clone(),
1538 if_not_exists: *if_not_exists,
1539 schema: Some(df_schema.try_into()?),
1540 },
1541 )),
1542 }),
1543 LogicalPlan::Analyze(a) => {
1544 let input = LogicalPlanNode::try_from_logical_plan(
1545 a.input.as_ref(),
1546 extension_codec,
1547 )?;
1548 Ok(LogicalPlanNode {
1549 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1550 protobuf::AnalyzeNode {
1551 input: Some(Box::new(input)),
1552 verbose: a.verbose,
1553 },
1554 ))),
1555 })
1556 }
1557 LogicalPlan::Explain(a) => {
1558 let input = LogicalPlanNode::try_from_logical_plan(
1559 a.plan.as_ref(),
1560 extension_codec,
1561 )?;
1562 Ok(LogicalPlanNode {
1563 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1564 protobuf::ExplainNode {
1565 input: Some(Box::new(input)),
1566 verbose: a.verbose,
1567 },
1568 ))),
1569 })
1570 }
1571 LogicalPlan::Union(union) => {
1572 let inputs: Vec<LogicalPlanNode> = union
1573 .inputs
1574 .iter()
1575 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1576 .collect::<Result<_>>()?;
1577 Ok(LogicalPlanNode {
1578 logical_plan_type: Some(LogicalPlanType::Union(
1579 protobuf::UnionNode { inputs },
1580 )),
1581 })
1582 }
1583 LogicalPlan::Extension(extension) => {
1584 let mut buf: Vec<u8> = vec![];
1585 extension_codec.try_encode(extension, &mut buf)?;
1586
1587 let inputs: Vec<LogicalPlanNode> = extension
1588 .node
1589 .inputs()
1590 .iter()
1591 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1592 .collect::<Result<_>>()?;
1593
1594 Ok(LogicalPlanNode {
1595 logical_plan_type: Some(LogicalPlanType::Extension(
1596 LogicalExtensionNode { node: buf, inputs },
1597 )),
1598 })
1599 }
1600 LogicalPlan::Statement(Statement::Prepare(Prepare {
1601 name,
1602 data_types,
1603 input,
1604 })) => {
1605 let input =
1606 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1607 Ok(LogicalPlanNode {
1608 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1609 protobuf::PrepareNode {
1610 name: name.clone(),
1611 data_types: data_types
1612 .iter()
1613 .map(|t| t.try_into())
1614 .collect::<Result<Vec<_>, _>>()?,
1615 input: Some(Box::new(input)),
1616 },
1617 ))),
1618 })
1619 }
1620 LogicalPlan::Unnest(Unnest {
1621 input,
1622 exec_columns,
1623 list_type_columns,
1624 struct_type_columns,
1625 dependency_indices,
1626 schema,
1627 options,
1628 }) => {
1629 let input =
1630 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1631 let proto_unnest_list_items = list_type_columns
1632 .iter()
1633 .map(|(index, ul)| ColumnUnnestListItem {
1634 input_index: *index as _,
1635 recursion: Some(ColumnUnnestListRecursion {
1636 output_column: Some(ul.output_column.to_owned().into()),
1637 depth: ul.depth as _,
1638 }),
1639 })
1640 .collect();
1641 Ok(LogicalPlanNode {
1642 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1643 protobuf::UnnestNode {
1644 input: Some(Box::new(input)),
1645 exec_columns: exec_columns
1646 .iter()
1647 .map(|col| col.into())
1648 .collect(),
1649 list_type_columns: proto_unnest_list_items,
1650 struct_type_columns: struct_type_columns
1651 .iter()
1652 .map(|c| *c as u64)
1653 .collect(),
1654 dependency_indices: dependency_indices
1655 .iter()
1656 .map(|c| *c as u64)
1657 .collect(),
1658 schema: Some(schema.try_into()?),
1659 options: Some(options.into()),
1660 },
1661 ))),
1662 })
1663 }
1664 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1665 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1666 )),
1667 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1668 "LogicalPlan serde is not yet implemented for CreateIndex",
1669 )),
1670 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1671 "LogicalPlan serde is not yet implemented for DropTable",
1672 )),
1673 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1674 name,
1675 if_exists,
1676 schema,
1677 })) => Ok(LogicalPlanNode {
1678 logical_plan_type: Some(LogicalPlanType::DropView(
1679 protobuf::DropViewNode {
1680 name: Some(name.clone().into()),
1681 if_exists: *if_exists,
1682 schema: Some(schema.try_into()?),
1683 },
1684 )),
1685 }),
1686 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1687 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1688 )),
1689 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1690 "LogicalPlan serde is not yet implemented for CreateFunction",
1691 )),
1692 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1693 "LogicalPlan serde is not yet implemented for DropFunction",
1694 )),
1695 LogicalPlan::Statement(_) => Err(proto_error(
1696 "LogicalPlan serde is not yet implemented for Statement",
1697 )),
1698 LogicalPlan::Dml(DmlStatement {
1699 table_name,
1700 target,
1701 op,
1702 input,
1703 ..
1704 }) => {
1705 let input =
1706 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1707 let dml_type: dml_node::Type = op.into();
1708 Ok(LogicalPlanNode {
1709 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1710 input: Some(Box::new(input)),
1711 target: Some(Box::new(from_table_source(
1712 table_name.clone(),
1713 Arc::clone(target),
1714 extension_codec,
1715 )?)),
1716 table_name: Some(table_name.clone().into()),
1717 dml_type: dml_type.into(),
1718 }))),
1719 })
1720 }
1721 LogicalPlan::Copy(dml::CopyTo {
1722 input,
1723 output_url,
1724 file_type,
1725 partition_by,
1726 ..
1727 }) => {
1728 let input =
1729 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1730 let mut buf = Vec::new();
1731 extension_codec
1732 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1733
1734 Ok(LogicalPlanNode {
1735 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1736 protobuf::CopyToNode {
1737 input: Some(Box::new(input)),
1738 output_url: output_url.to_string(),
1739 file_type: buf,
1740 partition_by: partition_by.clone(),
1741 },
1742 ))),
1743 })
1744 }
1745 LogicalPlan::DescribeTable(_) => Err(proto_error(
1746 "LogicalPlan serde is not yet implemented for DescribeTable",
1747 )),
1748 LogicalPlan::RecursiveQuery(recursive) => {
1749 let static_term = LogicalPlanNode::try_from_logical_plan(
1750 recursive.static_term.as_ref(),
1751 extension_codec,
1752 )?;
1753 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1754 recursive.recursive_term.as_ref(),
1755 extension_codec,
1756 )?;
1757
1758 Ok(LogicalPlanNode {
1759 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1760 protobuf::RecursiveQueryNode {
1761 name: recursive.name.clone(),
1762 static_term: Some(Box::new(static_term)),
1763 recursive_term: Some(Box::new(recursive_term)),
1764 is_distinct: recursive.is_distinct,
1765 },
1766 ))),
1767 })
1768 }
1769 }
1770 }
1771}