1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_common::file_options::file_type::FileType;
39use datafusion_common::{
40 context, internal_datafusion_err, internal_err, not_impl_err, plan_err, Result,
41 TableReference, ToDFSchema,
42};
43use datafusion_datasource::file_format::FileFormat;
44use datafusion_datasource::file_format::{
45 file_type_to_format, format_as_file_type, FileFormatFactory,
46};
47use datafusion_datasource_arrow::file_format::ArrowFormat;
48#[cfg(feature = "avro")]
49use datafusion_datasource_avro::file_format::AvroFormat;
50use datafusion_datasource_csv::file_format::CsvFormat;
51use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
52#[cfg(feature = "parquet")]
53use datafusion_datasource_parquet::file_format::ParquetFormat;
54use datafusion_expr::{
55 dml,
56 logical_plan::{
57 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
58 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
59 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
60 SubqueryAlias, TableScan, Values, Window,
61 },
62 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
63 Statement, WindowUDF,
64};
65use datafusion_expr::{
66 AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
67};
68
69use self::to_proto::{serialize_expr, serialize_exprs};
70use crate::logical_plan::to_proto::serialize_sorts;
71use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
72use datafusion_catalog::view::ViewTable;
73use datafusion_catalog::TableProvider;
74use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
75use datafusion_datasource::ListingTableUrl;
76use datafusion_execution::TaskContext;
77use prost::bytes::BufMut;
78use prost::Message;
79
80pub mod file_formats;
81pub mod from_proto;
82pub mod to_proto;
83
84pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
85 fn try_decode(buf: &[u8]) -> Result<Self>
86 where
87 Self: Sized;
88
89 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
90 where
91 B: BufMut,
92 Self: Sized;
93
94 fn try_into_logical_plan(
95 &self,
96 ctx: &TaskContext,
97 extension_codec: &dyn LogicalExtensionCodec,
98 ) -> Result<LogicalPlan>;
99
100 fn try_from_logical_plan(
101 plan: &LogicalPlan,
102 extension_codec: &dyn LogicalExtensionCodec,
103 ) -> Result<Self>
104 where
105 Self: Sized;
106}
107
108pub trait LogicalExtensionCodec: Debug + Send + Sync {
109 fn try_decode(
110 &self,
111 buf: &[u8],
112 inputs: &[LogicalPlan],
113 ctx: &TaskContext,
114 ) -> Result<Extension>;
115
116 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
117
118 fn try_decode_table_provider(
119 &self,
120 buf: &[u8],
121 table_ref: &TableReference,
122 schema: SchemaRef,
123 ctx: &TaskContext,
124 ) -> Result<Arc<dyn TableProvider>>;
125
126 fn try_encode_table_provider(
127 &self,
128 table_ref: &TableReference,
129 node: Arc<dyn TableProvider>,
130 buf: &mut Vec<u8>,
131 ) -> Result<()>;
132
133 fn try_decode_file_format(
134 &self,
135 _buf: &[u8],
136 _ctx: &TaskContext,
137 ) -> Result<Arc<dyn FileFormatFactory>> {
138 not_impl_err!("LogicalExtensionCodec is not provided for file format")
139 }
140
141 fn try_encode_file_format(
142 &self,
143 _buf: &mut Vec<u8>,
144 _node: Arc<dyn FileFormatFactory>,
145 ) -> Result<()> {
146 Ok(())
147 }
148
149 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
150 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
151 }
152
153 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
154 Ok(())
155 }
156
157 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
158 not_impl_err!(
159 "LogicalExtensionCodec is not provided for aggregate function {name}"
160 )
161 }
162
163 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
164 Ok(())
165 }
166
167 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
168 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
169 }
170
171 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
172 Ok(())
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct DefaultLogicalExtensionCodec {}
178
179impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
180 fn try_decode(
181 &self,
182 _buf: &[u8],
183 _inputs: &[LogicalPlan],
184 _ctx: &TaskContext,
185 ) -> Result<Extension> {
186 not_impl_err!("LogicalExtensionCodec is not provided")
187 }
188
189 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
190 not_impl_err!("LogicalExtensionCodec is not provided")
191 }
192
193 fn try_decode_table_provider(
194 &self,
195 _buf: &[u8],
196 _table_ref: &TableReference,
197 _schema: SchemaRef,
198 _ctx: &TaskContext,
199 ) -> Result<Arc<dyn TableProvider>> {
200 not_impl_err!("LogicalExtensionCodec is not provided")
201 }
202
203 fn try_encode_table_provider(
204 &self,
205 _table_ref: &TableReference,
206 _node: Arc<dyn TableProvider>,
207 _buf: &mut Vec<u8>,
208 ) -> Result<()> {
209 not_impl_err!("LogicalExtensionCodec is not provided")
210 }
211}
212
213#[macro_export]
214macro_rules! into_logical_plan {
215 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
216 if let Some(field) = $PB.as_ref() {
217 field.as_ref().try_into_logical_plan($CTX, $CODEC)
218 } else {
219 Err(proto_error("Missing required field in protobuf"))
220 }
221 }};
222}
223
224fn from_table_reference(
225 table_ref: Option<&protobuf::TableReference>,
226 error_context: &str,
227) -> Result<TableReference> {
228 let table_ref = table_ref.ok_or_else(|| {
229 internal_datafusion_err!(
230 "Protobuf deserialization error, {error_context} was missing required field name."
231 )
232 })?;
233
234 Ok(table_ref.clone().try_into()?)
235}
236
237fn to_table_source(
241 node: &Option<Box<LogicalPlanNode>>,
242 ctx: &TaskContext,
243 extension_codec: &dyn LogicalExtensionCodec,
244) -> Result<Arc<dyn TableSource>> {
245 if let Some(node) = node {
246 match node.try_into_logical_plan(ctx, extension_codec)? {
247 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
248 _ => plan_err!("expected TableScan node"),
249 }
250 } else {
251 plan_err!("LogicalPlanNode should be provided")
252 }
253}
254
255fn from_table_source(
259 table_name: TableReference,
260 target: Arc<dyn TableSource>,
261 extension_codec: &dyn LogicalExtensionCodec,
262) -> Result<LogicalPlanNode> {
263 let projected_schema = target.schema().to_dfschema_ref()?;
264 let r = LogicalPlan::TableScan(TableScan {
265 table_name,
266 source: target,
267 projection: None,
268 projected_schema,
269 filters: vec![],
270 fetch: None,
271 });
272
273 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
274}
275
276impl AsLogicalPlan for LogicalPlanNode {
277 fn try_decode(buf: &[u8]) -> Result<Self>
278 where
279 Self: Sized,
280 {
281 LogicalPlanNode::decode(buf)
282 .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
283 }
284
285 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
286 where
287 B: BufMut,
288 Self: Sized,
289 {
290 self.encode(buf)
291 .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
292 }
293
294 fn try_into_logical_plan(
295 &self,
296 ctx: &TaskContext,
297 extension_codec: &dyn LogicalExtensionCodec,
298 ) -> Result<LogicalPlan> {
299 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
300 proto_error(format!(
301 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
302 ))
303 })?;
304 match plan {
305 LogicalPlanType::Values(values) => {
306 let n_cols = values.n_cols as usize;
307 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
308 Ok(Vec::new())
309 } else if values.values_list.len() % n_cols != 0 {
310 internal_err!(
311 "Invalid values list length, expect {} to be divisible by {}",
312 values.values_list.len(),
313 n_cols
314 )
315 } else {
316 values
317 .values_list
318 .chunks_exact(n_cols)
319 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
320 .collect::<Result<Vec<_>, _>>()
321 .map_err(|e| e.into())
322 }?;
323
324 LogicalPlanBuilder::values(values)?.build()
325 }
326 LogicalPlanType::Projection(projection) => {
327 let input: LogicalPlan =
328 into_logical_plan!(projection.input, ctx, extension_codec)?;
329 let expr: Vec<Expr> =
330 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
331
332 let new_proj = project(input, expr)?;
333 match projection.optional_alias.as_ref() {
334 Some(a) => match a {
335 protobuf::projection_node::OptionalAlias::Alias(alias) => {
336 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
337 Arc::new(new_proj),
338 alias.clone(),
339 )?))
340 }
341 },
342 _ => Ok(new_proj),
343 }
344 }
345 LogicalPlanType::Selection(selection) => {
346 let input: LogicalPlan =
347 into_logical_plan!(selection.input, ctx, extension_codec)?;
348 let expr: Expr = selection
349 .expr
350 .as_ref()
351 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
352 .transpose()?
353 .ok_or_else(|| proto_error("expression required"))?;
354 LogicalPlanBuilder::from(input).filter(expr)?.build()
355 }
356 LogicalPlanType::Window(window) => {
357 let input: LogicalPlan =
358 into_logical_plan!(window.input, ctx, extension_codec)?;
359 let window_expr =
360 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
361 LogicalPlanBuilder::from(input).window(window_expr)?.build()
362 }
363 LogicalPlanType::Aggregate(aggregate) => {
364 let input: LogicalPlan =
365 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
366 let group_expr =
367 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
368 let aggr_expr =
369 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
370 LogicalPlanBuilder::from(input)
371 .aggregate(group_expr, aggr_expr)?
372 .build()
373 }
374 LogicalPlanType::ListingScan(scan) => {
375 let schema: Schema = convert_required!(scan.schema)?;
376
377 let filters =
378 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
379
380 let mut all_sort_orders = vec![];
381 for order in &scan.file_sort_order {
382 all_sort_orders.push(from_proto::parse_sorts(
383 &order.sort_expr_nodes,
384 ctx,
385 extension_codec,
386 )?)
387 }
388
389 let file_format: Arc<dyn FileFormat> =
390 match scan.file_format_type.as_ref().ok_or_else(|| {
391 proto_error(format!(
392 "logical_plan::from_proto() Unsupported file format '{self:?}'"
393 ))
394 })? {
395 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
396 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
397 #[cfg(feature = "parquet")]
398 {
399 let mut parquet = ParquetFormat::default();
400 if let Some(options) = options {
401 parquet = parquet.with_options(options.try_into()?)
402 }
403 Arc::new(parquet)
404 }
405 #[cfg(not(feature = "parquet"))]
406 panic!("Unable to process parquet file since `parquet` feature is not enabled");
407 }
408 FileFormatType::Csv(protobuf::CsvFormat {
409 options
410 }) => {
411 let mut csv = CsvFormat::default();
412 if let Some(options) = options {
413 csv = csv.with_options(options.try_into()?)
414 }
415 Arc::new(csv)
416 },
417 FileFormatType::Json(protobuf::NdJsonFormat {
418 options
419 }) => {
420 let mut json = OtherNdJsonFormat::default();
421 if let Some(options) = options {
422 json = json.with_options(options.try_into()?)
423 }
424 Arc::new(json)
425 }
426 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
427 FileFormatType::Avro(..) => {
428 #[cfg(feature = "avro")]
429 {
430 Arc::new(AvroFormat)
431 }
432 #[cfg(not(feature = "avro"))]
433 panic!("Unable to process avro file since `avro` feature is not enabled");
434 }
435 FileFormatType::Arrow(..) => {
436 Arc::new(ArrowFormat)
437 }
438 };
439
440 let table_paths = &scan
441 .paths
442 .iter()
443 .map(ListingTableUrl::parse)
444 .collect::<Result<Vec<_>, _>>()?;
445
446 let partition_columns = scan
447 .table_partition_cols
448 .iter()
449 .map(|col| {
450 let Some(arrow_type) = col.arrow_type.as_ref() else {
451 return Err(proto_error(
452 "Missing Arrow type in partition columns",
453 ));
454 };
455 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
456 proto_error(format!("Received an unknown ArrowType: {e}"))
457 })?;
458 Ok((col.name.clone(), arrow_type))
459 })
460 .collect::<Result<Vec<_>>>()?;
461
462 let options = ListingOptions::new(file_format)
463 .with_file_extension(&scan.file_extension)
464 .with_table_partition_cols(partition_columns)
465 .with_collect_stat(scan.collect_stat)
466 .with_target_partitions(scan.target_partitions as usize)
467 .with_file_sort_order(all_sort_orders);
468
469 let config =
470 ListingTableConfig::new_with_multi_paths(table_paths.clone())
471 .with_listing_options(options)
472 .with_schema(Arc::new(schema));
473
474 let provider = ListingTable::try_new(config)?.with_cache(
475 ctx.runtime_env().cache_manager.get_file_statistic_cache(),
476 );
477
478 let table_name =
479 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
480
481 let mut projection = None;
482 if let Some(columns) = &scan.projection {
483 let column_indices = columns
484 .columns
485 .iter()
486 .map(|name| provider.schema().index_of(name))
487 .collect::<Result<Vec<usize>, _>>()?;
488 projection = Some(column_indices);
489 }
490
491 LogicalPlanBuilder::scan_with_filters(
492 table_name,
493 provider_as_source(Arc::new(provider)),
494 projection,
495 filters,
496 )?
497 .build()
498 }
499 LogicalPlanType::CustomScan(scan) => {
500 let schema: Schema = convert_required!(scan.schema)?;
501 let schema = Arc::new(schema);
502 let mut projection = None;
503 if let Some(columns) = &scan.projection {
504 let column_indices = columns
505 .columns
506 .iter()
507 .map(|name| schema.index_of(name))
508 .collect::<Result<Vec<usize>, _>>()?;
509 projection = Some(column_indices);
510 }
511
512 let filters =
513 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
514
515 let table_name =
516 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
517
518 let provider = extension_codec.try_decode_table_provider(
519 &scan.custom_table_data,
520 &table_name,
521 schema,
522 ctx,
523 )?;
524
525 LogicalPlanBuilder::scan_with_filters(
526 table_name,
527 provider_as_source(provider),
528 projection,
529 filters,
530 )?
531 .build()
532 }
533 LogicalPlanType::Sort(sort) => {
534 let input: LogicalPlan =
535 into_logical_plan!(sort.input, ctx, extension_codec)?;
536 let sort_expr: Vec<SortExpr> =
537 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
538 let fetch: Option<usize> = sort.fetch.try_into().ok();
539 LogicalPlanBuilder::from(input)
540 .sort_with_limit(sort_expr, fetch)?
541 .build()
542 }
543 LogicalPlanType::Repartition(repartition) => {
544 use datafusion_expr::Partitioning;
545 let input: LogicalPlan =
546 into_logical_plan!(repartition.input, ctx, extension_codec)?;
547 use protobuf::repartition_node::PartitionMethod;
548 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
549 internal_datafusion_err!(
550 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
551 )
552 })?;
553
554 let partitioning_scheme = match pb_partition_method {
555 PartitionMethod::Hash(protobuf::HashRepartition {
556 hash_expr: pb_hash_expr,
557 partition_count,
558 }) => Partitioning::Hash(
559 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
560 *partition_count as usize,
561 ),
562 PartitionMethod::RoundRobin(partition_count) => {
563 Partitioning::RoundRobinBatch(*partition_count as usize)
564 }
565 };
566
567 LogicalPlanBuilder::from(input)
568 .repartition(partitioning_scheme)?
569 .build()
570 }
571 LogicalPlanType::EmptyRelation(empty_relation) => {
572 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
573 }
574 LogicalPlanType::CreateExternalTable(create_extern_table) => {
575 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
576 internal_datafusion_err!(
577 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
578 )
579 })?;
580
581 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
582 internal_datafusion_err!(
583 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
584 )
585 })?;
586 let definition = if !create_extern_table.definition.is_empty() {
587 Some(create_extern_table.definition.clone())
588 } else {
589 None
590 };
591
592 let mut order_exprs = vec![];
593 for expr in &create_extern_table.order_exprs {
594 order_exprs.push(from_proto::parse_sorts(
595 &expr.sort_expr_nodes,
596 ctx,
597 extension_codec,
598 )?);
599 }
600
601 let mut column_defaults =
602 HashMap::with_capacity(create_extern_table.column_defaults.len());
603 for (col_name, expr) in &create_extern_table.column_defaults {
604 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
605 column_defaults.insert(col_name.clone(), expr);
606 }
607
608 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
609 CreateExternalTable {
610 schema: pb_schema.try_into()?,
611 name: from_table_reference(
612 create_extern_table.name.as_ref(),
613 "CreateExternalTable",
614 )?,
615 location: create_extern_table.location.clone(),
616 file_type: create_extern_table.file_type.clone(),
617 table_partition_cols: create_extern_table
618 .table_partition_cols
619 .clone(),
620 order_exprs,
621 if_not_exists: create_extern_table.if_not_exists,
622 or_replace: create_extern_table.or_replace,
623 temporary: create_extern_table.temporary,
624 definition,
625 unbounded: create_extern_table.unbounded,
626 options: create_extern_table.options.clone(),
627 constraints: constraints.into(),
628 column_defaults,
629 },
630 )))
631 }
632 LogicalPlanType::CreateView(create_view) => {
633 let plan = create_view
634 .input.clone().ok_or_else(|| internal_datafusion_err!(
635 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
636 ))?
637 .try_into_logical_plan(ctx, extension_codec)?;
638 let definition = if !create_view.definition.is_empty() {
639 Some(create_view.definition.clone())
640 } else {
641 None
642 };
643
644 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
645 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
646 temporary: create_view.temporary,
647 input: Arc::new(plan),
648 or_replace: create_view.or_replace,
649 definition,
650 })))
651 }
652 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
653 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
654 internal_datafusion_err!(
655 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
656 )
657 })?;
658
659 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
660 CreateCatalogSchema {
661 schema_name: create_catalog_schema.schema_name.clone(),
662 if_not_exists: create_catalog_schema.if_not_exists,
663 schema: pb_schema.try_into()?,
664 },
665 )))
666 }
667 LogicalPlanType::CreateCatalog(create_catalog) => {
668 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
669 internal_datafusion_err!(
670 "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
671 )
672 })?;
673
674 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
675 CreateCatalog {
676 catalog_name: create_catalog.catalog_name.clone(),
677 if_not_exists: create_catalog.if_not_exists,
678 schema: pb_schema.try_into()?,
679 },
680 )))
681 }
682 LogicalPlanType::Analyze(analyze) => {
683 let input: LogicalPlan =
684 into_logical_plan!(analyze.input, ctx, extension_codec)?;
685 LogicalPlanBuilder::from(input)
686 .explain(analyze.verbose, true)?
687 .build()
688 }
689 LogicalPlanType::Explain(explain) => {
690 let input: LogicalPlan =
691 into_logical_plan!(explain.input, ctx, extension_codec)?;
692 LogicalPlanBuilder::from(input)
693 .explain(explain.verbose, false)?
694 .build()
695 }
696 LogicalPlanType::SubqueryAlias(aliased_relation) => {
697 let input: LogicalPlan =
698 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
699 let alias = from_table_reference(
700 aliased_relation.alias.as_ref(),
701 "SubqueryAlias",
702 )?;
703 LogicalPlanBuilder::from(input).alias(alias)?.build()
704 }
705 LogicalPlanType::Limit(limit) => {
706 let input: LogicalPlan =
707 into_logical_plan!(limit.input, ctx, extension_codec)?;
708 let skip = limit.skip.max(0) as usize;
709
710 let fetch = if limit.fetch < 0 {
711 None
712 } else {
713 Some(limit.fetch as usize)
714 };
715
716 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
717 }
718 LogicalPlanType::Join(join) => {
719 let left_keys: Vec<Expr> =
720 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
721 let right_keys: Vec<Expr> =
722 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
723 let join_type =
724 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
725 proto_error(format!(
726 "Received a JoinNode message with unknown JoinType {}",
727 join.join_type
728 ))
729 })?;
730 let join_constraint = protobuf::JoinConstraint::try_from(
731 join.join_constraint,
732 )
733 .map_err(|_| {
734 proto_error(format!(
735 "Received a JoinNode message with unknown JoinConstraint {}",
736 join.join_constraint
737 ))
738 })?;
739 let filter: Option<Expr> = join
740 .filter
741 .as_ref()
742 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
743 .map_or(Ok(None), |v| v.map(Some))?;
744
745 let builder = LogicalPlanBuilder::from(into_logical_plan!(
746 join.left,
747 ctx,
748 extension_codec
749 )?);
750 let builder = match join_constraint.into() {
751 JoinConstraint::On => builder.join_with_expr_keys(
752 into_logical_plan!(join.right, ctx, extension_codec)?,
753 join_type.into(),
754 (left_keys, right_keys),
755 filter,
756 )?,
757 JoinConstraint::Using => {
758 let using_keys = left_keys
760 .into_iter()
761 .map(|key| {
762 key.try_as_col().cloned()
763 .ok_or_else(|| internal_datafusion_err!(
764 "Using join keys must be column references, got: {key:?}"
765 ))
766 })
767 .collect::<Result<Vec<_>, _>>()?;
768 builder.join_using(
769 into_logical_plan!(join.right, ctx, extension_codec)?,
770 join_type.into(),
771 using_keys,
772 )?
773 }
774 };
775
776 builder.build()
777 }
778 LogicalPlanType::Union(union) => {
779 if union.inputs.len() < 2 {
780 return internal_err!(
781 "Protobuf deserialization error, Union was require at least two input."
782 );
783 }
784 let (first, rest) = union.inputs.split_first().unwrap();
785 let mut builder = LogicalPlanBuilder::from(
786 first.try_into_logical_plan(ctx, extension_codec)?,
787 );
788
789 for i in rest {
790 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
791 builder = builder.union(plan)?;
792 }
793 builder.build()
794 }
795 LogicalPlanType::CrossJoin(crossjoin) => {
796 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
797 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
798
799 LogicalPlanBuilder::from(left).cross_join(right)?.build()
800 }
801 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
802 let input_plans: Vec<LogicalPlan> = inputs
803 .iter()
804 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
805 .collect::<Result<_>>()?;
806
807 let extension_node =
808 extension_codec.try_decode(node, &input_plans, ctx)?;
809 Ok(LogicalPlan::Extension(extension_node))
810 }
811 LogicalPlanType::Distinct(distinct) => {
812 let input: LogicalPlan =
813 into_logical_plan!(distinct.input, ctx, extension_codec)?;
814 LogicalPlanBuilder::from(input).distinct()?.build()
815 }
816 LogicalPlanType::DistinctOn(distinct_on) => {
817 let input: LogicalPlan =
818 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
819 let on_expr =
820 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
821 let select_expr = from_proto::parse_exprs(
822 &distinct_on.select_expr,
823 ctx,
824 extension_codec,
825 )?;
826 let sort_expr = match distinct_on.sort_expr.len() {
827 0 => None,
828 _ => Some(from_proto::parse_sorts(
829 &distinct_on.sort_expr,
830 ctx,
831 extension_codec,
832 )?),
833 };
834 LogicalPlanBuilder::from(input)
835 .distinct_on(on_expr, select_expr, sort_expr)?
836 .build()
837 }
838 LogicalPlanType::ViewScan(scan) => {
839 let schema: Schema = convert_required!(scan.schema)?;
840
841 let mut projection = None;
842 if let Some(columns) = &scan.projection {
843 let column_indices = columns
844 .columns
845 .iter()
846 .map(|name| schema.index_of(name))
847 .collect::<Result<Vec<usize>, _>>()?;
848 projection = Some(column_indices);
849 }
850
851 let input: LogicalPlan =
852 into_logical_plan!(scan.input, ctx, extension_codec)?;
853
854 let definition = if !scan.definition.is_empty() {
855 Some(scan.definition.clone())
856 } else {
857 None
858 };
859
860 let provider = ViewTable::new(input, definition);
861
862 let table_name =
863 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
864
865 LogicalPlanBuilder::scan(
866 table_name,
867 provider_as_source(Arc::new(provider)),
868 projection,
869 )?
870 .build()
871 }
872 LogicalPlanType::Prepare(prepare) => {
873 let input: LogicalPlan =
874 into_logical_plan!(prepare.input, ctx, extension_codec)?;
875 let data_types: Vec<DataType> = prepare
876 .data_types
877 .iter()
878 .map(DataType::try_from)
879 .collect::<Result<_, _>>()?;
880 let fields: Vec<Field> = prepare
881 .fields
882 .iter()
883 .map(Field::try_from)
884 .collect::<Result<_, _>>()?;
885
886 if fields.is_empty() {
890 LogicalPlanBuilder::from(input)
891 .prepare(
892 prepare.name.clone(),
893 data_types
894 .into_iter()
895 .map(|dt| Field::new("", dt, true).into())
896 .collect(),
897 )?
898 .build()
899 } else {
900 LogicalPlanBuilder::from(input)
901 .prepare(
902 prepare.name.clone(),
903 fields.into_iter().map(|f| f.into()).collect(),
904 )?
905 .build()
906 }
907 }
908 LogicalPlanType::DropView(dropview) => {
909 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
910 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
911 if_exists: dropview.if_exists,
912 schema: Arc::new(convert_required!(dropview.schema)?),
913 })))
914 }
915 LogicalPlanType::CopyTo(copy) => {
916 let input: LogicalPlan =
917 into_logical_plan!(copy.input, ctx, extension_codec)?;
918
919 let file_type: Arc<dyn FileType> = format_as_file_type(
920 extension_codec.try_decode_file_format(©.file_type, ctx)?,
921 );
922
923 Ok(LogicalPlan::Copy(dml::CopyTo::new(
924 Arc::new(input),
925 copy.output_url.clone(),
926 copy.partition_by.clone(),
927 file_type,
928 Default::default(),
929 )))
930 }
931 LogicalPlanType::Unnest(unnest) => {
932 let input: LogicalPlan =
933 into_logical_plan!(unnest.input, ctx, extension_codec)?;
934
935 LogicalPlanBuilder::from(input)
936 .unnest_columns_with_options(
937 unnest.exec_columns.iter().map(|c| c.into()).collect(),
938 into_required!(unnest.options)?,
939 )?
940 .build()
941 }
942 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
943 let static_term = recursive_query_node
944 .static_term
945 .as_ref()
946 .ok_or_else(|| internal_datafusion_err!(
947 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
948 ))?
949 .try_into_logical_plan(ctx, extension_codec)?;
950
951 let recursive_term = recursive_query_node
952 .recursive_term
953 .as_ref()
954 .ok_or_else(|| internal_datafusion_err!(
955 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
956 ))?
957 .try_into_logical_plan(ctx, extension_codec)?;
958
959 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
960 name: recursive_query_node.name.clone(),
961 static_term: Arc::new(static_term),
962 recursive_term: Arc::new(recursive_term),
963 is_distinct: recursive_query_node.is_distinct,
964 }))
965 }
966 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
967 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
968 let schema = convert_required!(*schema)?;
969 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
970 LogicalPlanBuilder::scan(
971 name.as_str(),
972 provider_as_source(Arc::new(cte_work_table)),
973 None,
974 )?
975 .build()
976 }
977 LogicalPlanType::Dml(dml_node) => {
978 Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
979 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
980 to_table_source(&dml_node.target, ctx, extension_codec)?,
981 dml_node.dml_type().into(),
982 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
983 )))
984 }
985 }
986 }
987
988 fn try_from_logical_plan(
989 plan: &LogicalPlan,
990 extension_codec: &dyn LogicalExtensionCodec,
991 ) -> Result<Self>
992 where
993 Self: Sized,
994 {
995 match plan {
996 LogicalPlan::Values(Values { values, .. }) => {
997 let n_cols = if values.is_empty() {
998 0
999 } else {
1000 values[0].len()
1001 } as u64;
1002 let values_list =
1003 serialize_exprs(values.iter().flatten(), extension_codec)?;
1004 Ok(LogicalPlanNode {
1005 logical_plan_type: Some(LogicalPlanType::Values(
1006 protobuf::ValuesNode {
1007 n_cols,
1008 values_list,
1009 },
1010 )),
1011 })
1012 }
1013 LogicalPlan::TableScan(TableScan {
1014 table_name,
1015 source,
1016 filters,
1017 projection,
1018 ..
1019 }) => {
1020 let provider = source_as_provider(source)?;
1021 let schema = provider.schema();
1022 let source = provider.as_any();
1023
1024 let projection = match projection {
1025 None => None,
1026 Some(columns) => {
1027 let column_names = columns
1028 .iter()
1029 .map(|i| schema.field(*i).name().to_owned())
1030 .collect();
1031 Some(protobuf::ProjectionColumns {
1032 columns: column_names,
1033 })
1034 }
1035 };
1036
1037 let filters: Vec<protobuf::LogicalExprNode> =
1038 serialize_exprs(filters, extension_codec)?;
1039
1040 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1041 let any = listing_table.options().format.as_any();
1042 let file_format_type = {
1043 let mut maybe_some_type = None;
1044
1045 #[cfg(feature = "parquet")]
1046 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1047 let options = parquet.options();
1048 maybe_some_type =
1049 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1050 options: Some(options.try_into()?),
1051 }));
1052 };
1053
1054 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1055 let options = csv.options();
1056 maybe_some_type =
1057 Some(FileFormatType::Csv(protobuf::CsvFormat {
1058 options: Some(options.try_into()?),
1059 }));
1060 }
1061
1062 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1063 let options = json.options();
1064 maybe_some_type =
1065 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1066 options: Some(options.try_into()?),
1067 }))
1068 }
1069
1070 #[cfg(feature = "avro")]
1071 if any.is::<AvroFormat>() {
1072 maybe_some_type =
1073 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1074 }
1075
1076 if any.is::<ArrowFormat>() {
1077 maybe_some_type =
1078 Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1079 }
1080
1081 if let Some(file_format_type) = maybe_some_type {
1082 file_format_type
1083 } else {
1084 return Err(proto_error(format!(
1085 "Error deserializing unknown file format: {:?}",
1086 listing_table.options().format
1087 )));
1088 }
1089 };
1090
1091 let options = listing_table.options();
1092
1093 let mut builder = SchemaBuilder::from(schema.as_ref());
1094 for (idx, field) in schema.fields().iter().enumerate().rev() {
1095 if options
1096 .table_partition_cols
1097 .iter()
1098 .any(|(name, _)| name == field.name())
1099 {
1100 builder.remove(idx);
1101 }
1102 }
1103
1104 let schema = builder.finish();
1105
1106 let schema: protobuf::Schema = (&schema).try_into()?;
1107
1108 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1109 for order in &options.file_sort_order {
1110 let expr_vec = SortExprNodeCollection {
1111 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1112 };
1113 exprs_vec.push(expr_vec);
1114 }
1115
1116 let partition_columns = options
1117 .table_partition_cols
1118 .iter()
1119 .map(|(name, arrow_type)| {
1120 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1121 .map_err(|e| {
1122 proto_error(format!(
1123 "Received an unknown ArrowType: {e}"
1124 ))
1125 })?;
1126 Ok(protobuf::PartitionColumn {
1127 name: name.clone(),
1128 arrow_type: Some(arrow_type),
1129 })
1130 })
1131 .collect::<Result<Vec<_>>>()?;
1132
1133 Ok(LogicalPlanNode {
1134 logical_plan_type: Some(LogicalPlanType::ListingScan(
1135 protobuf::ListingTableScanNode {
1136 file_format_type: Some(file_format_type),
1137 table_name: Some(table_name.clone().into()),
1138 collect_stat: options.collect_stat,
1139 file_extension: options.file_extension.clone(),
1140 table_partition_cols: partition_columns,
1141 paths: listing_table
1142 .table_paths()
1143 .iter()
1144 .map(|x| x.to_string())
1145 .collect(),
1146 schema: Some(schema),
1147 projection,
1148 filters,
1149 target_partitions: options.target_partitions as u32,
1150 file_sort_order: exprs_vec,
1151 },
1152 )),
1153 })
1154 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1155 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1156 Ok(LogicalPlanNode {
1157 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1158 protobuf::ViewTableScanNode {
1159 table_name: Some(table_name.clone().into()),
1160 input: Some(Box::new(
1161 LogicalPlanNode::try_from_logical_plan(
1162 view_table.logical_plan(),
1163 extension_codec,
1164 )?,
1165 )),
1166 schema: Some(schema),
1167 projection,
1168 definition: view_table
1169 .definition()
1170 .map(|s| s.to_string())
1171 .unwrap_or_default(),
1172 },
1173 ))),
1174 })
1175 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1176 {
1177 let name = cte_work_table.name().to_string();
1178 let schema = cte_work_table.schema();
1179 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1180
1181 Ok(LogicalPlanNode {
1182 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1183 protobuf::CteWorkTableScanNode {
1184 name,
1185 schema: Some(schema),
1186 },
1187 )),
1188 })
1189 } else {
1190 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1191 let mut bytes = vec![];
1192 extension_codec
1193 .try_encode_table_provider(table_name, provider, &mut bytes)
1194 .map_err(|e| context!("Error serializing custom table", e))?;
1195 let scan = CustomScan(CustomTableScanNode {
1196 table_name: Some(table_name.clone().into()),
1197 projection,
1198 schema: Some(schema),
1199 filters,
1200 custom_table_data: bytes,
1201 });
1202 let node = LogicalPlanNode {
1203 logical_plan_type: Some(scan),
1204 };
1205 Ok(node)
1206 }
1207 }
1208 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1209 Ok(LogicalPlanNode {
1210 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1211 protobuf::ProjectionNode {
1212 input: Some(Box::new(
1213 LogicalPlanNode::try_from_logical_plan(
1214 input.as_ref(),
1215 extension_codec,
1216 )?,
1217 )),
1218 expr: serialize_exprs(expr, extension_codec)?,
1219 optional_alias: None,
1220 },
1221 ))),
1222 })
1223 }
1224 LogicalPlan::Filter(filter) => {
1225 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1226 filter.input.as_ref(),
1227 extension_codec,
1228 )?;
1229 Ok(LogicalPlanNode {
1230 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1231 protobuf::SelectionNode {
1232 input: Some(Box::new(input)),
1233 expr: Some(serialize_expr(
1234 &filter.predicate,
1235 extension_codec,
1236 )?),
1237 },
1238 ))),
1239 })
1240 }
1241 LogicalPlan::Distinct(Distinct::All(input)) => {
1242 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1243 input.as_ref(),
1244 extension_codec,
1245 )?;
1246 Ok(LogicalPlanNode {
1247 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1248 protobuf::DistinctNode {
1249 input: Some(Box::new(input)),
1250 },
1251 ))),
1252 })
1253 }
1254 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1255 on_expr,
1256 select_expr,
1257 sort_expr,
1258 input,
1259 ..
1260 })) => {
1261 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1262 input.as_ref(),
1263 extension_codec,
1264 )?;
1265 let sort_expr = match sort_expr {
1266 None => vec![],
1267 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1268 };
1269 Ok(LogicalPlanNode {
1270 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1271 protobuf::DistinctOnNode {
1272 on_expr: serialize_exprs(on_expr, extension_codec)?,
1273 select_expr: serialize_exprs(select_expr, extension_codec)?,
1274 sort_expr,
1275 input: Some(Box::new(input)),
1276 },
1277 ))),
1278 })
1279 }
1280 LogicalPlan::Window(Window {
1281 input, window_expr, ..
1282 }) => {
1283 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1284 input.as_ref(),
1285 extension_codec,
1286 )?;
1287 Ok(LogicalPlanNode {
1288 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1289 protobuf::WindowNode {
1290 input: Some(Box::new(input)),
1291 window_expr: serialize_exprs(window_expr, extension_codec)?,
1292 },
1293 ))),
1294 })
1295 }
1296 LogicalPlan::Aggregate(Aggregate {
1297 group_expr,
1298 aggr_expr,
1299 input,
1300 ..
1301 }) => {
1302 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1303 input.as_ref(),
1304 extension_codec,
1305 )?;
1306 Ok(LogicalPlanNode {
1307 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1308 protobuf::AggregateNode {
1309 input: Some(Box::new(input)),
1310 group_expr: serialize_exprs(group_expr, extension_codec)?,
1311 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1312 },
1313 ))),
1314 })
1315 }
1316 LogicalPlan::Join(Join {
1317 left,
1318 right,
1319 on,
1320 filter,
1321 join_type,
1322 join_constraint,
1323 null_equality,
1324 ..
1325 }) => {
1326 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1327 left.as_ref(),
1328 extension_codec,
1329 )?;
1330 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1331 right.as_ref(),
1332 extension_codec,
1333 )?;
1334 let (left_join_key, right_join_key) = on
1335 .iter()
1336 .map(|(l, r)| {
1337 Ok((
1338 serialize_expr(l, extension_codec)?,
1339 serialize_expr(r, extension_codec)?,
1340 ))
1341 })
1342 .collect::<Result<Vec<_>, ToProtoError>>()?
1343 .into_iter()
1344 .unzip();
1345 let join_type: protobuf::JoinType = join_type.to_owned().into();
1346 let join_constraint: protobuf::JoinConstraint =
1347 join_constraint.to_owned().into();
1348 let null_equality: protobuf::NullEquality =
1349 null_equality.to_owned().into();
1350 let filter = filter
1351 .as_ref()
1352 .map(|e| serialize_expr(e, extension_codec))
1353 .map_or(Ok(None), |v| v.map(Some))?;
1354 Ok(LogicalPlanNode {
1355 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1356 protobuf::JoinNode {
1357 left: Some(Box::new(left)),
1358 right: Some(Box::new(right)),
1359 join_type: join_type.into(),
1360 join_constraint: join_constraint.into(),
1361 left_join_key,
1362 right_join_key,
1363 null_equality: null_equality.into(),
1364 filter,
1365 },
1366 ))),
1367 })
1368 }
1369 LogicalPlan::Subquery(_) => {
1370 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1371 }
1372 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1373 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1374 input.as_ref(),
1375 extension_codec,
1376 )?;
1377 Ok(LogicalPlanNode {
1378 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1379 protobuf::SubqueryAliasNode {
1380 input: Some(Box::new(input)),
1381 alias: Some((*alias).clone().into()),
1382 },
1383 ))),
1384 })
1385 }
1386 LogicalPlan::Limit(limit) => {
1387 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1388 limit.input.as_ref(),
1389 extension_codec,
1390 )?;
1391 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1392 return Err(proto_error(
1393 "LogicalPlan::Limit only supports literal skip values",
1394 ));
1395 };
1396 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1397 return Err(proto_error(
1398 "LogicalPlan::Limit only supports literal fetch values",
1399 ));
1400 };
1401
1402 Ok(LogicalPlanNode {
1403 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1404 protobuf::LimitNode {
1405 input: Some(Box::new(input)),
1406 skip: skip as i64,
1407 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1408 },
1409 ))),
1410 })
1411 }
1412 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1413 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414 input.as_ref(),
1415 extension_codec,
1416 )?;
1417 let sort_expr: Vec<protobuf::SortExprNode> =
1418 serialize_sorts(expr, extension_codec)?;
1419 Ok(LogicalPlanNode {
1420 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1421 protobuf::SortNode {
1422 input: Some(Box::new(input)),
1423 expr: sort_expr,
1424 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1425 },
1426 ))),
1427 })
1428 }
1429 LogicalPlan::Repartition(Repartition {
1430 input,
1431 partitioning_scheme,
1432 }) => {
1433 use datafusion_expr::Partitioning;
1434 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1435 input.as_ref(),
1436 extension_codec,
1437 )?;
1438
1439 use protobuf::repartition_node::PartitionMethod;
1442
1443 let pb_partition_method = match partitioning_scheme {
1444 Partitioning::Hash(exprs, partition_count) => {
1445 PartitionMethod::Hash(protobuf::HashRepartition {
1446 hash_expr: serialize_exprs(exprs, extension_codec)?,
1447 partition_count: *partition_count as u64,
1448 })
1449 }
1450 Partitioning::RoundRobinBatch(partition_count) => {
1451 PartitionMethod::RoundRobin(*partition_count as u64)
1452 }
1453 Partitioning::DistributeBy(_) => {
1454 return not_impl_err!("DistributeBy")
1455 }
1456 };
1457
1458 Ok(LogicalPlanNode {
1459 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1460 protobuf::RepartitionNode {
1461 input: Some(Box::new(input)),
1462 partition_method: Some(pb_partition_method),
1463 },
1464 ))),
1465 })
1466 }
1467 LogicalPlan::EmptyRelation(EmptyRelation {
1468 produce_one_row, ..
1469 }) => Ok(LogicalPlanNode {
1470 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1471 protobuf::EmptyRelationNode {
1472 produce_one_row: *produce_one_row,
1473 },
1474 )),
1475 }),
1476 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1477 CreateExternalTable {
1478 name,
1479 location,
1480 file_type,
1481 schema: df_schema,
1482 table_partition_cols,
1483 if_not_exists,
1484 or_replace,
1485 definition,
1486 order_exprs,
1487 unbounded,
1488 options,
1489 constraints,
1490 column_defaults,
1491 temporary,
1492 },
1493 )) => {
1494 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1495 for order in order_exprs {
1496 let temp = SortExprNodeCollection {
1497 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1498 };
1499 converted_order_exprs.push(temp);
1500 }
1501
1502 let mut converted_column_defaults =
1503 HashMap::with_capacity(column_defaults.len());
1504 for (col_name, expr) in column_defaults {
1505 converted_column_defaults
1506 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1507 }
1508
1509 Ok(LogicalPlanNode {
1510 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1511 protobuf::CreateExternalTableNode {
1512 name: Some(name.clone().into()),
1513 location: location.clone(),
1514 file_type: file_type.clone(),
1515 schema: Some(df_schema.try_into()?),
1516 table_partition_cols: table_partition_cols.clone(),
1517 if_not_exists: *if_not_exists,
1518 or_replace: *or_replace,
1519 temporary: *temporary,
1520 order_exprs: converted_order_exprs,
1521 definition: definition.clone().unwrap_or_default(),
1522 unbounded: *unbounded,
1523 options: options.clone(),
1524 constraints: Some(constraints.clone().into()),
1525 column_defaults: converted_column_defaults,
1526 },
1527 )),
1528 })
1529 }
1530 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1531 name,
1532 input,
1533 or_replace,
1534 definition,
1535 temporary,
1536 })) => Ok(LogicalPlanNode {
1537 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1538 protobuf::CreateViewNode {
1539 name: Some(name.clone().into()),
1540 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1541 input,
1542 extension_codec,
1543 )?)),
1544 or_replace: *or_replace,
1545 temporary: *temporary,
1546 definition: definition.clone().unwrap_or_default(),
1547 },
1548 ))),
1549 }),
1550 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1551 CreateCatalogSchema {
1552 schema_name,
1553 if_not_exists,
1554 schema: df_schema,
1555 },
1556 )) => Ok(LogicalPlanNode {
1557 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1558 protobuf::CreateCatalogSchemaNode {
1559 schema_name: schema_name.clone(),
1560 if_not_exists: *if_not_exists,
1561 schema: Some(df_schema.try_into()?),
1562 },
1563 )),
1564 }),
1565 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1566 catalog_name,
1567 if_not_exists,
1568 schema: df_schema,
1569 })) => Ok(LogicalPlanNode {
1570 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1571 protobuf::CreateCatalogNode {
1572 catalog_name: catalog_name.clone(),
1573 if_not_exists: *if_not_exists,
1574 schema: Some(df_schema.try_into()?),
1575 },
1576 )),
1577 }),
1578 LogicalPlan::Analyze(a) => {
1579 let input = LogicalPlanNode::try_from_logical_plan(
1580 a.input.as_ref(),
1581 extension_codec,
1582 )?;
1583 Ok(LogicalPlanNode {
1584 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1585 protobuf::AnalyzeNode {
1586 input: Some(Box::new(input)),
1587 verbose: a.verbose,
1588 },
1589 ))),
1590 })
1591 }
1592 LogicalPlan::Explain(a) => {
1593 let input = LogicalPlanNode::try_from_logical_plan(
1594 a.plan.as_ref(),
1595 extension_codec,
1596 )?;
1597 Ok(LogicalPlanNode {
1598 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1599 protobuf::ExplainNode {
1600 input: Some(Box::new(input)),
1601 verbose: a.verbose,
1602 },
1603 ))),
1604 })
1605 }
1606 LogicalPlan::Union(union) => {
1607 let inputs: Vec<LogicalPlanNode> = union
1608 .inputs
1609 .iter()
1610 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1611 .collect::<Result<_>>()?;
1612 Ok(LogicalPlanNode {
1613 logical_plan_type: Some(LogicalPlanType::Union(
1614 protobuf::UnionNode { inputs },
1615 )),
1616 })
1617 }
1618 LogicalPlan::Extension(extension) => {
1619 let mut buf: Vec<u8> = vec![];
1620 extension_codec.try_encode(extension, &mut buf)?;
1621
1622 let inputs: Vec<LogicalPlanNode> = extension
1623 .node
1624 .inputs()
1625 .iter()
1626 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1627 .collect::<Result<_>>()?;
1628
1629 Ok(LogicalPlanNode {
1630 logical_plan_type: Some(LogicalPlanType::Extension(
1631 LogicalExtensionNode { node: buf, inputs },
1632 )),
1633 })
1634 }
1635 LogicalPlan::Statement(Statement::Prepare(Prepare {
1636 name,
1637 fields,
1638 input,
1639 })) => {
1640 let input =
1641 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1642 Ok(LogicalPlanNode {
1643 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1644 protobuf::PrepareNode {
1645 name: name.clone(),
1646 input: Some(Box::new(input)),
1647 data_types: fields
1649 .iter()
1650 .map(|f| f.data_type().try_into())
1651 .collect::<Result<Vec<_>, _>>()?,
1652 fields: fields
1654 .iter()
1655 .map(|f| f.as_ref().try_into())
1656 .collect::<Result<Vec<_>, _>>()?,
1657 },
1658 ))),
1659 })
1660 }
1661 LogicalPlan::Unnest(Unnest {
1662 input,
1663 exec_columns,
1664 list_type_columns,
1665 struct_type_columns,
1666 dependency_indices,
1667 schema,
1668 options,
1669 }) => {
1670 let input =
1671 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1672 let proto_unnest_list_items = list_type_columns
1673 .iter()
1674 .map(|(index, ul)| ColumnUnnestListItem {
1675 input_index: *index as _,
1676 recursion: Some(ColumnUnnestListRecursion {
1677 output_column: Some(ul.output_column.to_owned().into()),
1678 depth: ul.depth as _,
1679 }),
1680 })
1681 .collect();
1682 Ok(LogicalPlanNode {
1683 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1684 protobuf::UnnestNode {
1685 input: Some(Box::new(input)),
1686 exec_columns: exec_columns
1687 .iter()
1688 .map(|col| col.into())
1689 .collect(),
1690 list_type_columns: proto_unnest_list_items,
1691 struct_type_columns: struct_type_columns
1692 .iter()
1693 .map(|c| *c as u64)
1694 .collect(),
1695 dependency_indices: dependency_indices
1696 .iter()
1697 .map(|c| *c as u64)
1698 .collect(),
1699 schema: Some(schema.try_into()?),
1700 options: Some(options.into()),
1701 },
1702 ))),
1703 })
1704 }
1705 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1706 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1707 )),
1708 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1709 "LogicalPlan serde is not yet implemented for CreateIndex",
1710 )),
1711 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1712 "LogicalPlan serde is not yet implemented for DropTable",
1713 )),
1714 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1715 name,
1716 if_exists,
1717 schema,
1718 })) => Ok(LogicalPlanNode {
1719 logical_plan_type: Some(LogicalPlanType::DropView(
1720 protobuf::DropViewNode {
1721 name: Some(name.clone().into()),
1722 if_exists: *if_exists,
1723 schema: Some(schema.try_into()?),
1724 },
1725 )),
1726 }),
1727 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1728 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1729 )),
1730 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1731 "LogicalPlan serde is not yet implemented for CreateFunction",
1732 )),
1733 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1734 "LogicalPlan serde is not yet implemented for DropFunction",
1735 )),
1736 LogicalPlan::Statement(_) => Err(proto_error(
1737 "LogicalPlan serde is not yet implemented for Statement",
1738 )),
1739 LogicalPlan::Dml(DmlStatement {
1740 table_name,
1741 target,
1742 op,
1743 input,
1744 ..
1745 }) => {
1746 let input =
1747 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1748 let dml_type: dml_node::Type = op.into();
1749 Ok(LogicalPlanNode {
1750 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1751 input: Some(Box::new(input)),
1752 target: Some(Box::new(from_table_source(
1753 table_name.clone(),
1754 Arc::clone(target),
1755 extension_codec,
1756 )?)),
1757 table_name: Some(table_name.clone().into()),
1758 dml_type: dml_type.into(),
1759 }))),
1760 })
1761 }
1762 LogicalPlan::Copy(dml::CopyTo {
1763 input,
1764 output_url,
1765 file_type,
1766 partition_by,
1767 ..
1768 }) => {
1769 let input =
1770 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1771 let mut buf = Vec::new();
1772 extension_codec
1773 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1774
1775 Ok(LogicalPlanNode {
1776 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1777 protobuf::CopyToNode {
1778 input: Some(Box::new(input)),
1779 output_url: output_url.to_string(),
1780 file_type: buf,
1781 partition_by: partition_by.clone(),
1782 },
1783 ))),
1784 })
1785 }
1786 LogicalPlan::DescribeTable(_) => Err(proto_error(
1787 "LogicalPlan serde is not yet implemented for DescribeTable",
1788 )),
1789 LogicalPlan::RecursiveQuery(recursive) => {
1790 let static_term = LogicalPlanNode::try_from_logical_plan(
1791 recursive.static_term.as_ref(),
1792 extension_codec,
1793 )?;
1794 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1795 recursive.recursive_term.as_ref(),
1796 extension_codec,
1797 )?;
1798
1799 Ok(LogicalPlanNode {
1800 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1801 protobuf::RecursiveQueryNode {
1802 name: recursive.name.clone(),
1803 static_term: Some(Box::new(static_term)),
1804 recursive_term: Some(Box::new(recursive_term)),
1805 is_distinct: recursive.is_distinct,
1806 },
1807 ))),
1808 })
1809 }
1810 }
1811 }
1812}