1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43 file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46 datasource::{
47 file_format::{
48 csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49 },
50 listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51 view::ViewTable,
52 TableProvider,
53 },
54 datasource::{provider_as_source, source_as_provider},
55 prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59 context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60 DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63 dml,
64 logical_plan::{
65 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68 SubqueryAlias, TableScan, Values, Window,
69 },
70 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71 Statement, WindowUDF,
72};
73use datafusion_expr::{
74 AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
75 TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88 fn try_decode(buf: &[u8]) -> Result<Self>
89 where
90 Self: Sized;
91
92 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93 where
94 B: BufMut,
95 Self: Sized;
96
97 fn try_into_logical_plan(
98 &self,
99 ctx: &SessionContext,
100 extension_codec: &dyn LogicalExtensionCodec,
101 ) -> Result<LogicalPlan>;
102
103 fn try_from_logical_plan(
104 plan: &LogicalPlan,
105 extension_codec: &dyn LogicalExtensionCodec,
106 ) -> Result<Self>
107 where
108 Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112 fn try_decode(
113 &self,
114 buf: &[u8],
115 inputs: &[LogicalPlan],
116 ctx: &SessionContext,
117 ) -> Result<Extension>;
118
119 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121 fn try_decode_table_provider(
122 &self,
123 buf: &[u8],
124 table_ref: &TableReference,
125 schema: SchemaRef,
126 ctx: &SessionContext,
127 ) -> Result<Arc<dyn TableProvider>>;
128
129 fn try_encode_table_provider(
130 &self,
131 table_ref: &TableReference,
132 node: Arc<dyn TableProvider>,
133 buf: &mut Vec<u8>,
134 ) -> Result<()>;
135
136 fn try_decode_file_format(
137 &self,
138 _buf: &[u8],
139 _ctx: &SessionContext,
140 ) -> Result<Arc<dyn FileFormatFactory>> {
141 not_impl_err!("LogicalExtensionCodec is not provided for file format")
142 }
143
144 fn try_encode_file_format(
145 &self,
146 _buf: &mut Vec<u8>,
147 _node: Arc<dyn FileFormatFactory>,
148 ) -> Result<()> {
149 Ok(())
150 }
151
152 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154 }
155
156 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157 Ok(())
158 }
159
160 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161 not_impl_err!(
162 "LogicalExtensionCodec is not provided for aggregate function {name}"
163 )
164 }
165
166 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167 Ok(())
168 }
169
170 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172 }
173
174 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175 Ok(())
176 }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183 fn try_decode(
184 &self,
185 _buf: &[u8],
186 _inputs: &[LogicalPlan],
187 _ctx: &SessionContext,
188 ) -> Result<Extension> {
189 not_impl_err!("LogicalExtensionCodec is not provided")
190 }
191
192 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193 not_impl_err!("LogicalExtensionCodec is not provided")
194 }
195
196 fn try_decode_table_provider(
197 &self,
198 _buf: &[u8],
199 _table_ref: &TableReference,
200 _schema: SchemaRef,
201 _ctx: &SessionContext,
202 ) -> Result<Arc<dyn TableProvider>> {
203 not_impl_err!("LogicalExtensionCodec is not provided")
204 }
205
206 fn try_encode_table_provider(
207 &self,
208 _table_ref: &TableReference,
209 _node: Arc<dyn TableProvider>,
210 _buf: &mut Vec<u8>,
211 ) -> Result<()> {
212 not_impl_err!("LogicalExtensionCodec is not provided")
213 }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219 if let Some(field) = $PB.as_ref() {
220 field.as_ref().try_into_logical_plan($CTX, $CODEC)
221 } else {
222 Err(proto_error("Missing required field in protobuf"))
223 }
224 }};
225}
226
227fn from_table_reference(
228 table_ref: Option<&protobuf::TableReference>,
229 error_context: &str,
230) -> Result<TableReference> {
231 let table_ref = table_ref.ok_or_else(|| {
232 DataFusionError::Internal(format!(
233 "Protobuf deserialization error, {error_context} was missing required field name."
234 ))
235 })?;
236
237 Ok(table_ref.clone().try_into()?)
238}
239
240fn to_table_source(
244 node: &Option<Box<LogicalPlanNode>>,
245 ctx: &SessionContext,
246 extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248 if let Some(node) = node {
249 match node.try_into_logical_plan(ctx, extension_codec)? {
250 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251 _ => plan_err!("expected TableScan node"),
252 }
253 } else {
254 plan_err!("LogicalPlanNode should be provided")
255 }
256}
257
258fn from_table_source(
262 table_name: TableReference,
263 target: Arc<dyn TableSource>,
264 extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266 let projected_schema = target.schema().to_dfschema_ref()?;
267 let r = LogicalPlan::TableScan(TableScan {
268 table_name,
269 source: target,
270 projection: None,
271 projected_schema,
272 filters: vec![],
273 fetch: None,
274 });
275
276 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280 fn try_decode(buf: &[u8]) -> Result<Self>
281 where
282 Self: Sized,
283 {
284 LogicalPlanNode::decode(buf).map_err(|e| {
285 DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286 })
287 }
288
289 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290 where
291 B: BufMut,
292 Self: Sized,
293 {
294 self.encode(buf).map_err(|e| {
295 DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296 })
297 }
298
299 fn try_into_logical_plan(
300 &self,
301 ctx: &SessionContext,
302 extension_codec: &dyn LogicalExtensionCodec,
303 ) -> Result<LogicalPlan> {
304 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305 proto_error(format!(
306 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307 ))
308 })?;
309 match plan {
310 LogicalPlanType::Values(values) => {
311 let n_cols = values.n_cols as usize;
312 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313 Ok(Vec::new())
314 } else if values.values_list.len() % n_cols != 0 {
315 internal_err!(
316 "Invalid values list length, expect {} to be divisible by {}",
317 values.values_list.len(),
318 n_cols
319 )
320 } else {
321 values
322 .values_list
323 .chunks_exact(n_cols)
324 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325 .collect::<Result<Vec<_>, _>>()
326 .map_err(|e| e.into())
327 }?;
328
329 LogicalPlanBuilder::values(values)?.build()
330 }
331 LogicalPlanType::Projection(projection) => {
332 let input: LogicalPlan =
333 into_logical_plan!(projection.input, ctx, extension_codec)?;
334 let expr: Vec<Expr> =
335 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337 let new_proj = project(input, expr)?;
338 match projection.optional_alias.as_ref() {
339 Some(a) => match a {
340 protobuf::projection_node::OptionalAlias::Alias(alias) => {
341 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342 Arc::new(new_proj),
343 alias.clone(),
344 )?))
345 }
346 },
347 _ => Ok(new_proj),
348 }
349 }
350 LogicalPlanType::Selection(selection) => {
351 let input: LogicalPlan =
352 into_logical_plan!(selection.input, ctx, extension_codec)?;
353 let expr: Expr = selection
354 .expr
355 .as_ref()
356 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357 .transpose()?
358 .ok_or_else(|| {
359 DataFusionError::Internal("expression required".to_string())
360 })?;
361 LogicalPlanBuilder::from(input).filter(expr)?.build()
363 }
364 LogicalPlanType::Window(window) => {
365 let input: LogicalPlan =
366 into_logical_plan!(window.input, ctx, extension_codec)?;
367 let window_expr =
368 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
369 LogicalPlanBuilder::from(input).window(window_expr)?.build()
370 }
371 LogicalPlanType::Aggregate(aggregate) => {
372 let input: LogicalPlan =
373 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
374 let group_expr =
375 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
376 let aggr_expr =
377 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
378 LogicalPlanBuilder::from(input)
379 .aggregate(group_expr, aggr_expr)?
380 .build()
381 }
382 LogicalPlanType::ListingScan(scan) => {
383 let schema: Schema = convert_required!(scan.schema)?;
384
385 let mut projection = None;
386 if let Some(columns) = &scan.projection {
387 let column_indices = columns
388 .columns
389 .iter()
390 .map(|name| schema.index_of(name))
391 .collect::<Result<Vec<usize>, _>>()?;
392 projection = Some(column_indices);
393 }
394
395 let filters =
396 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
397
398 let mut all_sort_orders = vec![];
399 for order in &scan.file_sort_order {
400 all_sort_orders.push(from_proto::parse_sorts(
401 &order.sort_expr_nodes,
402 ctx,
403 extension_codec,
404 )?)
405 }
406
407 let file_format: Arc<dyn FileFormat> =
408 match scan.file_format_type.as_ref().ok_or_else(|| {
409 proto_error(format!(
410 "logical_plan::from_proto() Unsupported file format '{self:?}'"
411 ))
412 })? {
413 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
414 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
415 #[cfg(feature = "parquet")]
416 {
417 let mut parquet = ParquetFormat::default();
418 if let Some(options) = options {
419 parquet = parquet.with_options(options.try_into()?)
420 }
421 Arc::new(parquet)
422 }
423 #[cfg(not(feature = "parquet"))]
424 panic!("Unable to process parquet file since `parquet` feature is not enabled");
425 }
426 FileFormatType::Csv(protobuf::CsvFormat {
427 options
428 }) => {
429 let mut csv = CsvFormat::default();
430 if let Some(options) = options {
431 csv = csv.with_options(options.try_into()?)
432 }
433 Arc::new(csv)
434 },
435 FileFormatType::Json(protobuf::NdJsonFormat {
436 options
437 }) => {
438 let mut json = OtherNdJsonFormat::default();
439 if let Some(options) = options {
440 json = json.with_options(options.try_into()?)
441 }
442 Arc::new(json)
443 }
444 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
445 FileFormatType::Avro(..) => {
446 #[cfg(feature = "avro")]
447 {
448 Arc::new(AvroFormat)
449 }
450 #[cfg(not(feature = "avro"))]
451 panic!("Unable to process avro file since `avro` feature is not enabled");
452 }
453 };
454
455 let table_paths = &scan
456 .paths
457 .iter()
458 .map(ListingTableUrl::parse)
459 .collect::<Result<Vec<_>, _>>()?;
460
461 let options = ListingOptions::new(file_format)
462 .with_file_extension(&scan.file_extension)
463 .with_table_partition_cols(
464 scan.table_partition_cols
465 .iter()
466 .map(|col| {
467 (
468 col.clone(),
469 schema
470 .field_with_name(col)
471 .unwrap()
472 .data_type()
473 .clone(),
474 )
475 })
476 .collect(),
477 )
478 .with_collect_stat(scan.collect_stat)
479 .with_target_partitions(scan.target_partitions as usize)
480 .with_file_sort_order(all_sort_orders);
481
482 let config =
483 ListingTableConfig::new_with_multi_paths(table_paths.clone())
484 .with_listing_options(options)
485 .with_schema(Arc::new(schema));
486
487 let provider = ListingTable::try_new(config)?.with_cache(
488 ctx.state()
489 .runtime_env()
490 .cache_manager
491 .get_file_statistic_cache(),
492 );
493
494 let table_name =
495 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
496
497 LogicalPlanBuilder::scan_with_filters(
498 table_name,
499 provider_as_source(Arc::new(provider)),
500 projection,
501 filters,
502 )?
503 .build()
504 }
505 LogicalPlanType::CustomScan(scan) => {
506 let schema: Schema = convert_required!(scan.schema)?;
507 let schema = Arc::new(schema);
508 let mut projection = None;
509 if let Some(columns) = &scan.projection {
510 let column_indices = columns
511 .columns
512 .iter()
513 .map(|name| schema.index_of(name))
514 .collect::<Result<Vec<usize>, _>>()?;
515 projection = Some(column_indices);
516 }
517
518 let filters =
519 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
520
521 let table_name =
522 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
523
524 let provider = extension_codec.try_decode_table_provider(
525 &scan.custom_table_data,
526 &table_name,
527 schema,
528 ctx,
529 )?;
530
531 LogicalPlanBuilder::scan_with_filters(
532 table_name,
533 provider_as_source(provider),
534 projection,
535 filters,
536 )?
537 .build()
538 }
539 LogicalPlanType::Sort(sort) => {
540 let input: LogicalPlan =
541 into_logical_plan!(sort.input, ctx, extension_codec)?;
542 let sort_expr: Vec<SortExpr> =
543 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
544 let fetch: Option<usize> = sort.fetch.try_into().ok();
545 LogicalPlanBuilder::from(input)
546 .sort_with_limit(sort_expr, fetch)?
547 .build()
548 }
549 LogicalPlanType::Repartition(repartition) => {
550 use datafusion::logical_expr::Partitioning;
551 let input: LogicalPlan =
552 into_logical_plan!(repartition.input, ctx, extension_codec)?;
553 use protobuf::repartition_node::PartitionMethod;
554 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
555 internal_datafusion_err!(
556 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
557 )
558 })?;
559
560 let partitioning_scheme = match pb_partition_method {
561 PartitionMethod::Hash(protobuf::HashRepartition {
562 hash_expr: pb_hash_expr,
563 partition_count,
564 }) => Partitioning::Hash(
565 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
566 *partition_count as usize,
567 ),
568 PartitionMethod::RoundRobin(partition_count) => {
569 Partitioning::RoundRobinBatch(*partition_count as usize)
570 }
571 };
572
573 LogicalPlanBuilder::from(input)
574 .repartition(partitioning_scheme)?
575 .build()
576 }
577 LogicalPlanType::EmptyRelation(empty_relation) => {
578 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
579 }
580 LogicalPlanType::CreateExternalTable(create_extern_table) => {
581 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
582 DataFusionError::Internal(String::from(
583 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
584 ))
585 })?;
586
587 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
588 DataFusionError::Internal(String::from(
589 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
590 ))
591 })?;
592 let definition = if !create_extern_table.definition.is_empty() {
593 Some(create_extern_table.definition.clone())
594 } else {
595 None
596 };
597
598 let file_type = create_extern_table.file_type.as_str();
599 if ctx.table_factory(file_type).is_none() {
600 internal_err!("No TableProviderFactory for file type: {file_type}")?
601 }
602
603 let mut order_exprs = vec![];
604 for expr in &create_extern_table.order_exprs {
605 order_exprs.push(from_proto::parse_sorts(
606 &expr.sort_expr_nodes,
607 ctx,
608 extension_codec,
609 )?);
610 }
611
612 let mut column_defaults =
613 HashMap::with_capacity(create_extern_table.column_defaults.len());
614 for (col_name, expr) in &create_extern_table.column_defaults {
615 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
616 column_defaults.insert(col_name.clone(), expr);
617 }
618
619 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
620 CreateExternalTable {
621 schema: pb_schema.try_into()?,
622 name: from_table_reference(
623 create_extern_table.name.as_ref(),
624 "CreateExternalTable",
625 )?,
626 location: create_extern_table.location.clone(),
627 file_type: create_extern_table.file_type.clone(),
628 table_partition_cols: create_extern_table
629 .table_partition_cols
630 .clone(),
631 order_exprs,
632 if_not_exists: create_extern_table.if_not_exists,
633 temporary: create_extern_table.temporary,
634 definition,
635 unbounded: create_extern_table.unbounded,
636 options: create_extern_table.options.clone(),
637 constraints: constraints.into(),
638 column_defaults,
639 },
640 )))
641 }
642 LogicalPlanType::CreateView(create_view) => {
643 let plan = create_view
644 .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
645 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
646 )))?
647 .try_into_logical_plan(ctx, extension_codec)?;
648 let definition = if !create_view.definition.is_empty() {
649 Some(create_view.definition.clone())
650 } else {
651 None
652 };
653
654 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
655 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
656 temporary: create_view.temporary,
657 input: Arc::new(plan),
658 or_replace: create_view.or_replace,
659 definition,
660 })))
661 }
662 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
663 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
664 DataFusionError::Internal(String::from(
665 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
666 ))
667 })?;
668
669 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
670 CreateCatalogSchema {
671 schema_name: create_catalog_schema.schema_name.clone(),
672 if_not_exists: create_catalog_schema.if_not_exists,
673 schema: pb_schema.try_into()?,
674 },
675 )))
676 }
677 LogicalPlanType::CreateCatalog(create_catalog) => {
678 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
679 DataFusionError::Internal(String::from(
680 "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
681 ))
682 })?;
683
684 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
685 CreateCatalog {
686 catalog_name: create_catalog.catalog_name.clone(),
687 if_not_exists: create_catalog.if_not_exists,
688 schema: pb_schema.try_into()?,
689 },
690 )))
691 }
692 LogicalPlanType::Analyze(analyze) => {
693 let input: LogicalPlan =
694 into_logical_plan!(analyze.input, ctx, extension_codec)?;
695 LogicalPlanBuilder::from(input)
696 .explain(analyze.verbose, true)?
697 .build()
698 }
699 LogicalPlanType::Explain(explain) => {
700 let input: LogicalPlan =
701 into_logical_plan!(explain.input, ctx, extension_codec)?;
702 LogicalPlanBuilder::from(input)
703 .explain(explain.verbose, false)?
704 .build()
705 }
706 LogicalPlanType::SubqueryAlias(aliased_relation) => {
707 let input: LogicalPlan =
708 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
709 let alias = from_table_reference(
710 aliased_relation.alias.as_ref(),
711 "SubqueryAlias",
712 )?;
713 LogicalPlanBuilder::from(input).alias(alias)?.build()
714 }
715 LogicalPlanType::Limit(limit) => {
716 let input: LogicalPlan =
717 into_logical_plan!(limit.input, ctx, extension_codec)?;
718 let skip = limit.skip.max(0) as usize;
719
720 let fetch = if limit.fetch < 0 {
721 None
722 } else {
723 Some(limit.fetch as usize)
724 };
725
726 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
727 }
728 LogicalPlanType::Join(join) => {
729 let left_keys: Vec<Expr> =
730 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
731 let right_keys: Vec<Expr> =
732 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
733 let join_type =
734 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
735 proto_error(format!(
736 "Received a JoinNode message with unknown JoinType {}",
737 join.join_type
738 ))
739 })?;
740 let join_constraint = protobuf::JoinConstraint::try_from(
741 join.join_constraint,
742 )
743 .map_err(|_| {
744 proto_error(format!(
745 "Received a JoinNode message with unknown JoinConstraint {}",
746 join.join_constraint
747 ))
748 })?;
749 let filter: Option<Expr> = join
750 .filter
751 .as_ref()
752 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
753 .map_or(Ok(None), |v| v.map(Some))?;
754
755 let builder = LogicalPlanBuilder::from(into_logical_plan!(
756 join.left,
757 ctx,
758 extension_codec
759 )?);
760 let builder = match join_constraint.into() {
761 JoinConstraint::On => builder.join_with_expr_keys(
762 into_logical_plan!(join.right, ctx, extension_codec)?,
763 join_type.into(),
764 (left_keys, right_keys),
765 filter,
766 )?,
767 JoinConstraint::Using => {
768 let using_keys = left_keys
770 .into_iter()
771 .map(|key| {
772 key.try_as_col().cloned()
773 .ok_or_else(|| internal_datafusion_err!(
774 "Using join keys must be column references, got: {key:?}"
775 ))
776 })
777 .collect::<Result<Vec<_>, _>>()?;
778 builder.join_using(
779 into_logical_plan!(join.right, ctx, extension_codec)?,
780 join_type.into(),
781 using_keys,
782 )?
783 }
784 };
785
786 builder.build()
787 }
788 LogicalPlanType::Union(union) => {
789 if union.inputs.len() < 2 {
790 return Err( DataFusionError::Internal(String::from(
791 "Protobuf deserialization error, Union was require at least two input.",
792 )));
793 }
794 let (first, rest) = union.inputs.split_first().unwrap();
795 let mut builder = LogicalPlanBuilder::from(
796 first.try_into_logical_plan(ctx, extension_codec)?,
797 );
798
799 for i in rest {
800 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
801 builder = builder.union(plan)?;
802 }
803 builder.build()
804 }
805 LogicalPlanType::CrossJoin(crossjoin) => {
806 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
807 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
808
809 LogicalPlanBuilder::from(left).cross_join(right)?.build()
810 }
811 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
812 let input_plans: Vec<LogicalPlan> = inputs
813 .iter()
814 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
815 .collect::<Result<_>>()?;
816
817 let extension_node =
818 extension_codec.try_decode(node, &input_plans, ctx)?;
819 Ok(LogicalPlan::Extension(extension_node))
820 }
821 LogicalPlanType::Distinct(distinct) => {
822 let input: LogicalPlan =
823 into_logical_plan!(distinct.input, ctx, extension_codec)?;
824 LogicalPlanBuilder::from(input).distinct()?.build()
825 }
826 LogicalPlanType::DistinctOn(distinct_on) => {
827 let input: LogicalPlan =
828 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
829 let on_expr =
830 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
831 let select_expr = from_proto::parse_exprs(
832 &distinct_on.select_expr,
833 ctx,
834 extension_codec,
835 )?;
836 let sort_expr = match distinct_on.sort_expr.len() {
837 0 => None,
838 _ => Some(from_proto::parse_sorts(
839 &distinct_on.sort_expr,
840 ctx,
841 extension_codec,
842 )?),
843 };
844 LogicalPlanBuilder::from(input)
845 .distinct_on(on_expr, select_expr, sort_expr)?
846 .build()
847 }
848 LogicalPlanType::ViewScan(scan) => {
849 let schema: Schema = convert_required!(scan.schema)?;
850
851 let mut projection = None;
852 if let Some(columns) = &scan.projection {
853 let column_indices = columns
854 .columns
855 .iter()
856 .map(|name| schema.index_of(name))
857 .collect::<Result<Vec<usize>, _>>()?;
858 projection = Some(column_indices);
859 }
860
861 let input: LogicalPlan =
862 into_logical_plan!(scan.input, ctx, extension_codec)?;
863
864 let definition = if !scan.definition.is_empty() {
865 Some(scan.definition.clone())
866 } else {
867 None
868 };
869
870 let provider = ViewTable::new(input, definition);
871
872 let table_name =
873 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
874
875 LogicalPlanBuilder::scan(
876 table_name,
877 provider_as_source(Arc::new(provider)),
878 projection,
879 )?
880 .build()
881 }
882 LogicalPlanType::Prepare(prepare) => {
883 let input: LogicalPlan =
884 into_logical_plan!(prepare.input, ctx, extension_codec)?;
885 let data_types: Vec<DataType> = prepare
886 .data_types
887 .iter()
888 .map(DataType::try_from)
889 .collect::<Result<_, _>>()?;
890 LogicalPlanBuilder::from(input)
891 .prepare(prepare.name.clone(), data_types)?
892 .build()
893 }
894 LogicalPlanType::DropView(dropview) => {
895 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
896 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
897 if_exists: dropview.if_exists,
898 schema: Arc::new(convert_required!(dropview.schema)?),
899 })))
900 }
901 LogicalPlanType::CopyTo(copy) => {
902 let input: LogicalPlan =
903 into_logical_plan!(copy.input, ctx, extension_codec)?;
904
905 let file_type: Arc<dyn FileType> = format_as_file_type(
906 extension_codec.try_decode_file_format(©.file_type, ctx)?,
907 );
908
909 Ok(LogicalPlan::Copy(dml::CopyTo {
910 input: Arc::new(input),
911 output_url: copy.output_url.clone(),
912 partition_by: copy.partition_by.clone(),
913 file_type,
914 options: Default::default(),
915 }))
916 }
917 LogicalPlanType::Unnest(unnest) => {
918 let input: LogicalPlan =
919 into_logical_plan!(unnest.input, ctx, extension_codec)?;
920 Ok(LogicalPlan::Unnest(Unnest {
921 input: Arc::new(input),
922 exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
923 list_type_columns: unnest
924 .list_type_columns
925 .iter()
926 .map(|c| {
927 let recursion_item = c.recursion.as_ref().unwrap();
928 (
929 c.input_index as _,
930 ColumnUnnestList {
931 output_column: recursion_item
932 .output_column
933 .as_ref()
934 .unwrap()
935 .into(),
936 depth: recursion_item.depth as _,
937 },
938 )
939 })
940 .collect(),
941 struct_type_columns: unnest
942 .struct_type_columns
943 .iter()
944 .map(|c| *c as usize)
945 .collect(),
946 dependency_indices: unnest
947 .dependency_indices
948 .iter()
949 .map(|c| *c as usize)
950 .collect(),
951 schema: Arc::new(convert_required!(unnest.schema)?),
952 options: into_required!(unnest.options)?,
953 }))
954 }
955 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
956 let static_term = recursive_query_node
957 .static_term
958 .as_ref()
959 .ok_or_else(|| DataFusionError::Internal(String::from(
960 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
961 )))?
962 .try_into_logical_plan(ctx, extension_codec)?;
963
964 let recursive_term = recursive_query_node
965 .recursive_term
966 .as_ref()
967 .ok_or_else(|| DataFusionError::Internal(String::from(
968 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
969 )))?
970 .try_into_logical_plan(ctx, extension_codec)?;
971
972 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
973 name: recursive_query_node.name.clone(),
974 static_term: Arc::new(static_term),
975 recursive_term: Arc::new(recursive_term),
976 is_distinct: recursive_query_node.is_distinct,
977 }))
978 }
979 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
980 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
981 let schema = convert_required!(*schema)?;
982 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
983 LogicalPlanBuilder::scan(
984 name.as_str(),
985 provider_as_source(Arc::new(cte_work_table)),
986 None,
987 )?
988 .build()
989 }
990 LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
991 datafusion::logical_expr::DmlStatement::new(
992 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
993 to_table_source(&dml_node.target, ctx, extension_codec)?,
994 dml_node.dml_type().into(),
995 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
996 ),
997 )),
998 }
999 }
1000
1001 fn try_from_logical_plan(
1002 plan: &LogicalPlan,
1003 extension_codec: &dyn LogicalExtensionCodec,
1004 ) -> Result<Self>
1005 where
1006 Self: Sized,
1007 {
1008 match plan {
1009 LogicalPlan::Values(Values { values, .. }) => {
1010 let n_cols = if values.is_empty() {
1011 0
1012 } else {
1013 values[0].len()
1014 } as u64;
1015 let values_list =
1016 serialize_exprs(values.iter().flatten(), extension_codec)?;
1017 Ok(LogicalPlanNode {
1018 logical_plan_type: Some(LogicalPlanType::Values(
1019 protobuf::ValuesNode {
1020 n_cols,
1021 values_list,
1022 },
1023 )),
1024 })
1025 }
1026 LogicalPlan::TableScan(TableScan {
1027 table_name,
1028 source,
1029 filters,
1030 projection,
1031 ..
1032 }) => {
1033 let provider = source_as_provider(source)?;
1034 let schema = provider.schema();
1035 let source = provider.as_any();
1036
1037 let projection = match projection {
1038 None => None,
1039 Some(columns) => {
1040 let column_names = columns
1041 .iter()
1042 .map(|i| schema.field(*i).name().to_owned())
1043 .collect();
1044 Some(protobuf::ProjectionColumns {
1045 columns: column_names,
1046 })
1047 }
1048 };
1049 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1050
1051 let filters: Vec<protobuf::LogicalExprNode> =
1052 serialize_exprs(filters, extension_codec)?;
1053
1054 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1055 let any = listing_table.options().format.as_any();
1056 let file_format_type = {
1057 let mut maybe_some_type = None;
1058
1059 #[cfg(feature = "parquet")]
1060 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1061 let options = parquet.options();
1062 maybe_some_type =
1063 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1064 options: Some(options.try_into()?),
1065 }));
1066 };
1067
1068 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1069 let options = csv.options();
1070 maybe_some_type =
1071 Some(FileFormatType::Csv(protobuf::CsvFormat {
1072 options: Some(options.try_into()?),
1073 }));
1074 }
1075
1076 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1077 let options = json.options();
1078 maybe_some_type =
1079 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1080 options: Some(options.try_into()?),
1081 }))
1082 }
1083
1084 #[cfg(feature = "avro")]
1085 if any.is::<AvroFormat>() {
1086 maybe_some_type =
1087 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1088 }
1089
1090 if let Some(file_format_type) = maybe_some_type {
1091 file_format_type
1092 } else {
1093 return Err(proto_error(format!(
1094 "Error converting file format, {:?} is invalid as a datafusion format.",
1095 listing_table.options().format
1096 )));
1097 }
1098 };
1099
1100 let options = listing_table.options();
1101
1102 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1103 for order in &options.file_sort_order {
1104 let expr_vec = SortExprNodeCollection {
1105 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1106 };
1107 exprs_vec.push(expr_vec);
1108 }
1109
1110 Ok(LogicalPlanNode {
1111 logical_plan_type: Some(LogicalPlanType::ListingScan(
1112 protobuf::ListingTableScanNode {
1113 file_format_type: Some(file_format_type),
1114 table_name: Some(table_name.clone().into()),
1115 collect_stat: options.collect_stat,
1116 file_extension: options.file_extension.clone(),
1117 table_partition_cols: options
1118 .table_partition_cols
1119 .iter()
1120 .map(|x| x.0.clone())
1121 .collect::<Vec<_>>(),
1122 paths: listing_table
1123 .table_paths()
1124 .iter()
1125 .map(|x| x.to_string())
1126 .collect(),
1127 schema: Some(schema),
1128 projection,
1129 filters,
1130 target_partitions: options.target_partitions as u32,
1131 file_sort_order: exprs_vec,
1132 },
1133 )),
1134 })
1135 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1136 Ok(LogicalPlanNode {
1137 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1138 protobuf::ViewTableScanNode {
1139 table_name: Some(table_name.clone().into()),
1140 input: Some(Box::new(
1141 LogicalPlanNode::try_from_logical_plan(
1142 view_table.logical_plan(),
1143 extension_codec,
1144 )?,
1145 )),
1146 schema: Some(schema),
1147 projection,
1148 definition: view_table
1149 .definition()
1150 .map(|s| s.to_string())
1151 .unwrap_or_default(),
1152 },
1153 ))),
1154 })
1155 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1156 {
1157 let name = cte_work_table.name().to_string();
1158 let schema = cte_work_table.schema();
1159 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1160
1161 Ok(LogicalPlanNode {
1162 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1163 protobuf::CteWorkTableScanNode {
1164 name,
1165 schema: Some(schema),
1166 },
1167 )),
1168 })
1169 } else {
1170 let mut bytes = vec![];
1171 extension_codec
1172 .try_encode_table_provider(table_name, provider, &mut bytes)
1173 .map_err(|e| context!("Error serializing custom table", e))?;
1174 let scan = CustomScan(CustomTableScanNode {
1175 table_name: Some(table_name.clone().into()),
1176 projection,
1177 schema: Some(schema),
1178 filters,
1179 custom_table_data: bytes,
1180 });
1181 let node = LogicalPlanNode {
1182 logical_plan_type: Some(scan),
1183 };
1184 Ok(node)
1185 }
1186 }
1187 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1188 Ok(LogicalPlanNode {
1189 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1190 protobuf::ProjectionNode {
1191 input: Some(Box::new(
1192 LogicalPlanNode::try_from_logical_plan(
1193 input.as_ref(),
1194 extension_codec,
1195 )?,
1196 )),
1197 expr: serialize_exprs(expr, extension_codec)?,
1198 optional_alias: None,
1199 },
1200 ))),
1201 })
1202 }
1203 LogicalPlan::Filter(filter) => {
1204 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1205 filter.input.as_ref(),
1206 extension_codec,
1207 )?;
1208 Ok(LogicalPlanNode {
1209 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1210 protobuf::SelectionNode {
1211 input: Some(Box::new(input)),
1212 expr: Some(serialize_expr(
1213 &filter.predicate,
1214 extension_codec,
1215 )?),
1216 },
1217 ))),
1218 })
1219 }
1220 LogicalPlan::Distinct(Distinct::All(input)) => {
1221 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1222 input.as_ref(),
1223 extension_codec,
1224 )?;
1225 Ok(LogicalPlanNode {
1226 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1227 protobuf::DistinctNode {
1228 input: Some(Box::new(input)),
1229 },
1230 ))),
1231 })
1232 }
1233 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1234 on_expr,
1235 select_expr,
1236 sort_expr,
1237 input,
1238 ..
1239 })) => {
1240 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1241 input.as_ref(),
1242 extension_codec,
1243 )?;
1244 let sort_expr = match sort_expr {
1245 None => vec![],
1246 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1247 };
1248 Ok(LogicalPlanNode {
1249 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1250 protobuf::DistinctOnNode {
1251 on_expr: serialize_exprs(on_expr, extension_codec)?,
1252 select_expr: serialize_exprs(select_expr, extension_codec)?,
1253 sort_expr,
1254 input: Some(Box::new(input)),
1255 },
1256 ))),
1257 })
1258 }
1259 LogicalPlan::Window(Window {
1260 input, window_expr, ..
1261 }) => {
1262 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1263 input.as_ref(),
1264 extension_codec,
1265 )?;
1266 Ok(LogicalPlanNode {
1267 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1268 protobuf::WindowNode {
1269 input: Some(Box::new(input)),
1270 window_expr: serialize_exprs(window_expr, extension_codec)?,
1271 },
1272 ))),
1273 })
1274 }
1275 LogicalPlan::Aggregate(Aggregate {
1276 group_expr,
1277 aggr_expr,
1278 input,
1279 ..
1280 }) => {
1281 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1282 input.as_ref(),
1283 extension_codec,
1284 )?;
1285 Ok(LogicalPlanNode {
1286 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1287 protobuf::AggregateNode {
1288 input: Some(Box::new(input)),
1289 group_expr: serialize_exprs(group_expr, extension_codec)?,
1290 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1291 },
1292 ))),
1293 })
1294 }
1295 LogicalPlan::Join(Join {
1296 left,
1297 right,
1298 on,
1299 filter,
1300 join_type,
1301 join_constraint,
1302 null_equals_null,
1303 ..
1304 }) => {
1305 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1306 left.as_ref(),
1307 extension_codec,
1308 )?;
1309 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310 right.as_ref(),
1311 extension_codec,
1312 )?;
1313 let (left_join_key, right_join_key) = on
1314 .iter()
1315 .map(|(l, r)| {
1316 Ok((
1317 serialize_expr(l, extension_codec)?,
1318 serialize_expr(r, extension_codec)?,
1319 ))
1320 })
1321 .collect::<Result<Vec<_>, ToProtoError>>()?
1322 .into_iter()
1323 .unzip();
1324 let join_type: protobuf::JoinType = join_type.to_owned().into();
1325 let join_constraint: protobuf::JoinConstraint =
1326 join_constraint.to_owned().into();
1327 let filter = filter
1328 .as_ref()
1329 .map(|e| serialize_expr(e, extension_codec))
1330 .map_or(Ok(None), |v| v.map(Some))?;
1331 Ok(LogicalPlanNode {
1332 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1333 protobuf::JoinNode {
1334 left: Some(Box::new(left)),
1335 right: Some(Box::new(right)),
1336 join_type: join_type.into(),
1337 join_constraint: join_constraint.into(),
1338 left_join_key,
1339 right_join_key,
1340 null_equals_null: *null_equals_null,
1341 filter,
1342 },
1343 ))),
1344 })
1345 }
1346 LogicalPlan::Subquery(_) => {
1347 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1348 }
1349 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1350 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1351 input.as_ref(),
1352 extension_codec,
1353 )?;
1354 Ok(LogicalPlanNode {
1355 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1356 protobuf::SubqueryAliasNode {
1357 input: Some(Box::new(input)),
1358 alias: Some((*alias).clone().into()),
1359 },
1360 ))),
1361 })
1362 }
1363 LogicalPlan::Limit(limit) => {
1364 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1365 limit.input.as_ref(),
1366 extension_codec,
1367 )?;
1368 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1369 return Err(proto_error(
1370 "LogicalPlan::Limit only supports literal skip values",
1371 ));
1372 };
1373 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1374 return Err(proto_error(
1375 "LogicalPlan::Limit only supports literal fetch values",
1376 ));
1377 };
1378
1379 Ok(LogicalPlanNode {
1380 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1381 protobuf::LimitNode {
1382 input: Some(Box::new(input)),
1383 skip: skip as i64,
1384 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1385 },
1386 ))),
1387 })
1388 }
1389 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1390 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1391 input.as_ref(),
1392 extension_codec,
1393 )?;
1394 let sort_expr: Vec<protobuf::SortExprNode> =
1395 serialize_sorts(expr, extension_codec)?;
1396 Ok(LogicalPlanNode {
1397 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1398 protobuf::SortNode {
1399 input: Some(Box::new(input)),
1400 expr: sort_expr,
1401 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1402 },
1403 ))),
1404 })
1405 }
1406 LogicalPlan::Repartition(Repartition {
1407 input,
1408 partitioning_scheme,
1409 }) => {
1410 use datafusion::logical_expr::Partitioning;
1411 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1412 input.as_ref(),
1413 extension_codec,
1414 )?;
1415
1416 use protobuf::repartition_node::PartitionMethod;
1419
1420 let pb_partition_method = match partitioning_scheme {
1421 Partitioning::Hash(exprs, partition_count) => {
1422 PartitionMethod::Hash(protobuf::HashRepartition {
1423 hash_expr: serialize_exprs(exprs, extension_codec)?,
1424 partition_count: *partition_count as u64,
1425 })
1426 }
1427 Partitioning::RoundRobinBatch(partition_count) => {
1428 PartitionMethod::RoundRobin(*partition_count as u64)
1429 }
1430 Partitioning::DistributeBy(_) => {
1431 return not_impl_err!("DistributeBy")
1432 }
1433 };
1434
1435 Ok(LogicalPlanNode {
1436 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1437 protobuf::RepartitionNode {
1438 input: Some(Box::new(input)),
1439 partition_method: Some(pb_partition_method),
1440 },
1441 ))),
1442 })
1443 }
1444 LogicalPlan::EmptyRelation(EmptyRelation {
1445 produce_one_row, ..
1446 }) => Ok(LogicalPlanNode {
1447 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1448 protobuf::EmptyRelationNode {
1449 produce_one_row: *produce_one_row,
1450 },
1451 )),
1452 }),
1453 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1454 CreateExternalTable {
1455 name,
1456 location,
1457 file_type,
1458 schema: df_schema,
1459 table_partition_cols,
1460 if_not_exists,
1461 definition,
1462 order_exprs,
1463 unbounded,
1464 options,
1465 constraints,
1466 column_defaults,
1467 temporary,
1468 },
1469 )) => {
1470 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1471 for order in order_exprs {
1472 let temp = SortExprNodeCollection {
1473 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1474 };
1475 converted_order_exprs.push(temp);
1476 }
1477
1478 let mut converted_column_defaults =
1479 HashMap::with_capacity(column_defaults.len());
1480 for (col_name, expr) in column_defaults {
1481 converted_column_defaults
1482 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1483 }
1484
1485 Ok(LogicalPlanNode {
1486 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1487 protobuf::CreateExternalTableNode {
1488 name: Some(name.clone().into()),
1489 location: location.clone(),
1490 file_type: file_type.clone(),
1491 schema: Some(df_schema.try_into()?),
1492 table_partition_cols: table_partition_cols.clone(),
1493 if_not_exists: *if_not_exists,
1494 temporary: *temporary,
1495 order_exprs: converted_order_exprs,
1496 definition: definition.clone().unwrap_or_default(),
1497 unbounded: *unbounded,
1498 options: options.clone(),
1499 constraints: Some(constraints.clone().into()),
1500 column_defaults: converted_column_defaults,
1501 },
1502 )),
1503 })
1504 }
1505 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1506 name,
1507 input,
1508 or_replace,
1509 definition,
1510 temporary,
1511 })) => Ok(LogicalPlanNode {
1512 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1513 protobuf::CreateViewNode {
1514 name: Some(name.clone().into()),
1515 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1516 input,
1517 extension_codec,
1518 )?)),
1519 or_replace: *or_replace,
1520 temporary: *temporary,
1521 definition: definition.clone().unwrap_or_default(),
1522 },
1523 ))),
1524 }),
1525 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1526 CreateCatalogSchema {
1527 schema_name,
1528 if_not_exists,
1529 schema: df_schema,
1530 },
1531 )) => Ok(LogicalPlanNode {
1532 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1533 protobuf::CreateCatalogSchemaNode {
1534 schema_name: schema_name.clone(),
1535 if_not_exists: *if_not_exists,
1536 schema: Some(df_schema.try_into()?),
1537 },
1538 )),
1539 }),
1540 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1541 catalog_name,
1542 if_not_exists,
1543 schema: df_schema,
1544 })) => Ok(LogicalPlanNode {
1545 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1546 protobuf::CreateCatalogNode {
1547 catalog_name: catalog_name.clone(),
1548 if_not_exists: *if_not_exists,
1549 schema: Some(df_schema.try_into()?),
1550 },
1551 )),
1552 }),
1553 LogicalPlan::Analyze(a) => {
1554 let input = LogicalPlanNode::try_from_logical_plan(
1555 a.input.as_ref(),
1556 extension_codec,
1557 )?;
1558 Ok(LogicalPlanNode {
1559 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1560 protobuf::AnalyzeNode {
1561 input: Some(Box::new(input)),
1562 verbose: a.verbose,
1563 },
1564 ))),
1565 })
1566 }
1567 LogicalPlan::Explain(a) => {
1568 let input = LogicalPlanNode::try_from_logical_plan(
1569 a.plan.as_ref(),
1570 extension_codec,
1571 )?;
1572 Ok(LogicalPlanNode {
1573 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1574 protobuf::ExplainNode {
1575 input: Some(Box::new(input)),
1576 verbose: a.verbose,
1577 },
1578 ))),
1579 })
1580 }
1581 LogicalPlan::Union(union) => {
1582 let inputs: Vec<LogicalPlanNode> = union
1583 .inputs
1584 .iter()
1585 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1586 .collect::<Result<_>>()?;
1587 Ok(LogicalPlanNode {
1588 logical_plan_type: Some(LogicalPlanType::Union(
1589 protobuf::UnionNode { inputs },
1590 )),
1591 })
1592 }
1593 LogicalPlan::Extension(extension) => {
1594 let mut buf: Vec<u8> = vec![];
1595 extension_codec.try_encode(extension, &mut buf)?;
1596
1597 let inputs: Vec<LogicalPlanNode> = extension
1598 .node
1599 .inputs()
1600 .iter()
1601 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1602 .collect::<Result<_>>()?;
1603
1604 Ok(LogicalPlanNode {
1605 logical_plan_type: Some(LogicalPlanType::Extension(
1606 LogicalExtensionNode { node: buf, inputs },
1607 )),
1608 })
1609 }
1610 LogicalPlan::Statement(Statement::Prepare(Prepare {
1611 name,
1612 data_types,
1613 input,
1614 })) => {
1615 let input =
1616 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1617 Ok(LogicalPlanNode {
1618 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1619 protobuf::PrepareNode {
1620 name: name.clone(),
1621 data_types: data_types
1622 .iter()
1623 .map(|t| t.try_into())
1624 .collect::<Result<Vec<_>, _>>()?,
1625 input: Some(Box::new(input)),
1626 },
1627 ))),
1628 })
1629 }
1630 LogicalPlan::Unnest(Unnest {
1631 input,
1632 exec_columns,
1633 list_type_columns,
1634 struct_type_columns,
1635 dependency_indices,
1636 schema,
1637 options,
1638 }) => {
1639 let input =
1640 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1641 let proto_unnest_list_items = list_type_columns
1642 .iter()
1643 .map(|(index, ul)| ColumnUnnestListItem {
1644 input_index: *index as _,
1645 recursion: Some(ColumnUnnestListRecursion {
1646 output_column: Some(ul.output_column.to_owned().into()),
1647 depth: ul.depth as _,
1648 }),
1649 })
1650 .collect();
1651 Ok(LogicalPlanNode {
1652 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1653 protobuf::UnnestNode {
1654 input: Some(Box::new(input)),
1655 exec_columns: exec_columns
1656 .iter()
1657 .map(|col| col.into())
1658 .collect(),
1659 list_type_columns: proto_unnest_list_items,
1660 struct_type_columns: struct_type_columns
1661 .iter()
1662 .map(|c| *c as u64)
1663 .collect(),
1664 dependency_indices: dependency_indices
1665 .iter()
1666 .map(|c| *c as u64)
1667 .collect(),
1668 schema: Some(schema.try_into()?),
1669 options: Some(options.into()),
1670 },
1671 ))),
1672 })
1673 }
1674 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1675 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1676 )),
1677 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1678 "LogicalPlan serde is not yet implemented for CreateIndex",
1679 )),
1680 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1681 "LogicalPlan serde is not yet implemented for DropTable",
1682 )),
1683 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1684 name,
1685 if_exists,
1686 schema,
1687 })) => Ok(LogicalPlanNode {
1688 logical_plan_type: Some(LogicalPlanType::DropView(
1689 protobuf::DropViewNode {
1690 name: Some(name.clone().into()),
1691 if_exists: *if_exists,
1692 schema: Some(schema.try_into()?),
1693 },
1694 )),
1695 }),
1696 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1697 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1698 )),
1699 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1700 "LogicalPlan serde is not yet implemented for CreateFunction",
1701 )),
1702 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1703 "LogicalPlan serde is not yet implemented for DropFunction",
1704 )),
1705 LogicalPlan::Statement(_) => Err(proto_error(
1706 "LogicalPlan serde is not yet implemented for Statement",
1707 )),
1708 LogicalPlan::Dml(DmlStatement {
1709 table_name,
1710 target,
1711 op,
1712 input,
1713 ..
1714 }) => {
1715 let input =
1716 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1717 let dml_type: dml_node::Type = op.into();
1718 Ok(LogicalPlanNode {
1719 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1720 input: Some(Box::new(input)),
1721 target: Some(Box::new(from_table_source(
1722 table_name.clone(),
1723 Arc::clone(target),
1724 extension_codec,
1725 )?)),
1726 table_name: Some(table_name.clone().into()),
1727 dml_type: dml_type.into(),
1728 }))),
1729 })
1730 }
1731 LogicalPlan::Copy(dml::CopyTo {
1732 input,
1733 output_url,
1734 file_type,
1735 partition_by,
1736 ..
1737 }) => {
1738 let input =
1739 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1740 let mut buf = Vec::new();
1741 extension_codec
1742 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1743
1744 Ok(LogicalPlanNode {
1745 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1746 protobuf::CopyToNode {
1747 input: Some(Box::new(input)),
1748 output_url: output_url.to_string(),
1749 file_type: buf,
1750 partition_by: partition_by.clone(),
1751 },
1752 ))),
1753 })
1754 }
1755 LogicalPlan::DescribeTable(_) => Err(proto_error(
1756 "LogicalPlan serde is not yet implemented for DescribeTable",
1757 )),
1758 LogicalPlan::RecursiveQuery(recursive) => {
1759 let static_term = LogicalPlanNode::try_from_logical_plan(
1760 recursive.static_term.as_ref(),
1761 extension_codec,
1762 )?;
1763 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1764 recursive.recursive_term.as_ref(),
1765 extension_codec,
1766 )?;
1767
1768 Ok(LogicalPlanNode {
1769 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1770 protobuf::RecursiveQueryNode {
1771 name: recursive.name.clone(),
1772 static_term: Some(Box::new(static_term)),
1773 recursive_term: Some(Box::new(recursive_term)),
1774 is_distinct: recursive.is_distinct,
1775 },
1776 ))),
1777 })
1778 }
1779 }
1780 }
1781}