1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43 file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46 datasource::{
47 file_format::{
48 csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49 },
50 listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51 view::ViewTable,
52 TableProvider,
53 },
54 datasource::{provider_as_source, source_as_provider},
55 prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59 context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60 DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63 dml,
64 logical_plan::{
65 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68 SubqueryAlias, TableScan, Values, Window,
69 },
70 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71 Statement, WindowUDF,
72};
73use datafusion_expr::{
74 AggregateUDF, ColumnUnnestList, DmlStatement, FetchType, RecursiveQuery, SkipType,
75 TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88 fn try_decode(buf: &[u8]) -> Result<Self>
89 where
90 Self: Sized;
91
92 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93 where
94 B: BufMut,
95 Self: Sized;
96
97 fn try_into_logical_plan(
98 &self,
99 ctx: &SessionContext,
100 extension_codec: &dyn LogicalExtensionCodec,
101 ) -> Result<LogicalPlan>;
102
103 fn try_from_logical_plan(
104 plan: &LogicalPlan,
105 extension_codec: &dyn LogicalExtensionCodec,
106 ) -> Result<Self>
107 where
108 Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112 fn try_decode(
113 &self,
114 buf: &[u8],
115 inputs: &[LogicalPlan],
116 ctx: &SessionContext,
117 ) -> Result<Extension>;
118
119 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121 fn try_decode_table_provider(
122 &self,
123 buf: &[u8],
124 table_ref: &TableReference,
125 schema: SchemaRef,
126 ctx: &SessionContext,
127 ) -> Result<Arc<dyn TableProvider>>;
128
129 fn try_encode_table_provider(
130 &self,
131 table_ref: &TableReference,
132 node: Arc<dyn TableProvider>,
133 buf: &mut Vec<u8>,
134 ) -> Result<()>;
135
136 fn try_decode_file_format(
137 &self,
138 _buf: &[u8],
139 _ctx: &SessionContext,
140 ) -> Result<Arc<dyn FileFormatFactory>> {
141 not_impl_err!("LogicalExtensionCodec is not provided for file format")
142 }
143
144 fn try_encode_file_format(
145 &self,
146 _buf: &mut Vec<u8>,
147 _node: Arc<dyn FileFormatFactory>,
148 ) -> Result<()> {
149 Ok(())
150 }
151
152 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154 }
155
156 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157 Ok(())
158 }
159
160 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161 not_impl_err!(
162 "LogicalExtensionCodec is not provided for aggregate function {name}"
163 )
164 }
165
166 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167 Ok(())
168 }
169
170 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172 }
173
174 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175 Ok(())
176 }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183 fn try_decode(
184 &self,
185 _buf: &[u8],
186 _inputs: &[LogicalPlan],
187 _ctx: &SessionContext,
188 ) -> Result<Extension> {
189 not_impl_err!("LogicalExtensionCodec is not provided")
190 }
191
192 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193 not_impl_err!("LogicalExtensionCodec is not provided")
194 }
195
196 fn try_decode_table_provider(
197 &self,
198 _buf: &[u8],
199 _table_ref: &TableReference,
200 _schema: SchemaRef,
201 _ctx: &SessionContext,
202 ) -> Result<Arc<dyn TableProvider>> {
203 not_impl_err!("LogicalExtensionCodec is not provided")
204 }
205
206 fn try_encode_table_provider(
207 &self,
208 _table_ref: &TableReference,
209 _node: Arc<dyn TableProvider>,
210 _buf: &mut Vec<u8>,
211 ) -> Result<()> {
212 not_impl_err!("LogicalExtensionCodec is not provided")
213 }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219 if let Some(field) = $PB.as_ref() {
220 field.as_ref().try_into_logical_plan($CTX, $CODEC)
221 } else {
222 Err(proto_error("Missing required field in protobuf"))
223 }
224 }};
225}
226
227fn from_table_reference(
228 table_ref: Option<&protobuf::TableReference>,
229 error_context: &str,
230) -> Result<TableReference> {
231 let table_ref = table_ref.ok_or_else(|| {
232 DataFusionError::Internal(format!(
233 "Protobuf deserialization error, {error_context} was missing required field name."
234 ))
235 })?;
236
237 Ok(table_ref.clone().try_into()?)
238}
239
240fn to_table_source(
244 node: &Option<Box<LogicalPlanNode>>,
245 ctx: &SessionContext,
246 extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248 if let Some(node) = node {
249 match node.try_into_logical_plan(ctx, extension_codec)? {
250 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251 _ => plan_err!("expected TableScan node"),
252 }
253 } else {
254 plan_err!("LogicalPlanNode should be provided")
255 }
256}
257
258fn from_table_source(
262 table_name: TableReference,
263 target: Arc<dyn TableSource>,
264 extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266 let projected_schema = target.schema().to_dfschema_ref()?;
267 let r = LogicalPlan::TableScan(TableScan {
268 table_name,
269 source: target,
270 projection: None,
271 projected_schema,
272 filters: vec![],
273 fetch: None,
274 });
275
276 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280 fn try_decode(buf: &[u8]) -> Result<Self>
281 where
282 Self: Sized,
283 {
284 LogicalPlanNode::decode(buf).map_err(|e| {
285 DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286 })
287 }
288
289 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290 where
291 B: BufMut,
292 Self: Sized,
293 {
294 self.encode(buf).map_err(|e| {
295 DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296 })
297 }
298
299 fn try_into_logical_plan(
300 &self,
301 ctx: &SessionContext,
302 extension_codec: &dyn LogicalExtensionCodec,
303 ) -> Result<LogicalPlan> {
304 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305 proto_error(format!(
306 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307 ))
308 })?;
309 match plan {
310 LogicalPlanType::Values(values) => {
311 let n_cols = values.n_cols as usize;
312 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313 Ok(Vec::new())
314 } else if values.values_list.len() % n_cols != 0 {
315 internal_err!(
316 "Invalid values list length, expect {} to be divisible by {}",
317 values.values_list.len(),
318 n_cols
319 )
320 } else {
321 values
322 .values_list
323 .chunks_exact(n_cols)
324 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325 .collect::<Result<Vec<_>, _>>()
326 .map_err(|e| e.into())
327 }?;
328
329 LogicalPlanBuilder::values(values)?.build()
330 }
331 LogicalPlanType::Projection(projection) => {
332 let input: LogicalPlan =
333 into_logical_plan!(projection.input, ctx, extension_codec)?;
334 let expr: Vec<Expr> =
335 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337 let new_proj = project(input, expr)?;
338 match projection.optional_alias.as_ref() {
339 Some(a) => match a {
340 protobuf::projection_node::OptionalAlias::Alias(alias) => {
341 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342 Arc::new(new_proj),
343 alias.clone(),
344 )?))
345 }
346 },
347 _ => Ok(new_proj),
348 }
349 }
350 LogicalPlanType::Selection(selection) => {
351 let input: LogicalPlan =
352 into_logical_plan!(selection.input, ctx, extension_codec)?;
353 let expr: Expr = selection
354 .expr
355 .as_ref()
356 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357 .transpose()?
358 .ok_or_else(|| proto_error("expression required"))?;
359 LogicalPlanBuilder::from(input).filter(expr)?.build()
360 }
361 LogicalPlanType::Window(window) => {
362 let input: LogicalPlan =
363 into_logical_plan!(window.input, ctx, extension_codec)?;
364 let window_expr =
365 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
366 LogicalPlanBuilder::from(input).window(window_expr)?.build()
367 }
368 LogicalPlanType::Aggregate(aggregate) => {
369 let input: LogicalPlan =
370 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
371 let group_expr =
372 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
373 let aggr_expr =
374 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
375 LogicalPlanBuilder::from(input)
376 .aggregate(group_expr, aggr_expr)?
377 .build()
378 }
379 LogicalPlanType::ListingScan(scan) => {
380 let schema: Schema = convert_required!(scan.schema)?;
381
382 let mut projection = None;
383 if let Some(columns) = &scan.projection {
384 let column_indices = columns
385 .columns
386 .iter()
387 .map(|name| schema.index_of(name))
388 .collect::<Result<Vec<usize>, _>>()?;
389 projection = Some(column_indices);
390 }
391
392 let filters =
393 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
394
395 let mut all_sort_orders = vec![];
396 for order in &scan.file_sort_order {
397 all_sort_orders.push(from_proto::parse_sorts(
398 &order.sort_expr_nodes,
399 ctx,
400 extension_codec,
401 )?)
402 }
403
404 let file_format: Arc<dyn FileFormat> =
405 match scan.file_format_type.as_ref().ok_or_else(|| {
406 proto_error(format!(
407 "logical_plan::from_proto() Unsupported file format '{self:?}'"
408 ))
409 })? {
410 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
411 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
412 #[cfg(feature = "parquet")]
413 {
414 let mut parquet = ParquetFormat::default();
415 if let Some(options) = options {
416 parquet = parquet.with_options(options.try_into()?)
417 }
418 Arc::new(parquet)
419 }
420 #[cfg(not(feature = "parquet"))]
421 panic!("Unable to process parquet file since `parquet` feature is not enabled");
422 }
423 FileFormatType::Csv(protobuf::CsvFormat {
424 options
425 }) => {
426 let mut csv = CsvFormat::default();
427 if let Some(options) = options {
428 csv = csv.with_options(options.try_into()?)
429 }
430 Arc::new(csv)
431 },
432 FileFormatType::Json(protobuf::NdJsonFormat {
433 options
434 }) => {
435 let mut json = OtherNdJsonFormat::default();
436 if let Some(options) = options {
437 json = json.with_options(options.try_into()?)
438 }
439 Arc::new(json)
440 }
441 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
442 FileFormatType::Avro(..) => {
443 #[cfg(feature = "avro")]
444 {
445 Arc::new(AvroFormat)
446 }
447 #[cfg(not(feature = "avro"))]
448 panic!("Unable to process avro file since `avro` feature is not enabled");
449 }
450 };
451
452 let table_paths = &scan
453 .paths
454 .iter()
455 .map(ListingTableUrl::parse)
456 .collect::<Result<Vec<_>, _>>()?;
457
458 let partition_columns = scan
459 .table_partition_cols
460 .iter()
461 .map(|col| {
462 let Some(arrow_type) = col.arrow_type.as_ref() else {
463 return Err(proto_error(
464 "Missing Arrow type in partition columns",
465 ));
466 };
467 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
468 proto_error(format!("Received an unknown ArrowType: {e}"))
469 })?;
470 Ok((col.name.clone(), arrow_type))
471 })
472 .collect::<Result<Vec<_>>>()?;
473
474 let options = ListingOptions::new(file_format)
475 .with_file_extension(&scan.file_extension)
476 .with_table_partition_cols(partition_columns)
477 .with_collect_stat(scan.collect_stat)
478 .with_target_partitions(scan.target_partitions as usize)
479 .with_file_sort_order(all_sort_orders);
480
481 let config =
482 ListingTableConfig::new_with_multi_paths(table_paths.clone())
483 .with_listing_options(options)
484 .with_schema(Arc::new(schema));
485
486 let provider = ListingTable::try_new(config)?.with_cache(
487 ctx.state()
488 .runtime_env()
489 .cache_manager
490 .get_file_statistic_cache(),
491 );
492
493 let table_name =
494 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
495
496 LogicalPlanBuilder::scan_with_filters(
497 table_name,
498 provider_as_source(Arc::new(provider)),
499 projection,
500 filters,
501 )?
502 .build()
503 }
504 LogicalPlanType::CustomScan(scan) => {
505 let schema: Schema = convert_required!(scan.schema)?;
506 let schema = Arc::new(schema);
507 let mut projection = None;
508 if let Some(columns) = &scan.projection {
509 let column_indices = columns
510 .columns
511 .iter()
512 .map(|name| schema.index_of(name))
513 .collect::<Result<Vec<usize>, _>>()?;
514 projection = Some(column_indices);
515 }
516
517 let filters =
518 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
519
520 let table_name =
521 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
522
523 let provider = extension_codec.try_decode_table_provider(
524 &scan.custom_table_data,
525 &table_name,
526 schema,
527 ctx,
528 )?;
529
530 LogicalPlanBuilder::scan_with_filters(
531 table_name,
532 provider_as_source(provider),
533 projection,
534 filters,
535 )?
536 .build()
537 }
538 LogicalPlanType::Sort(sort) => {
539 let input: LogicalPlan =
540 into_logical_plan!(sort.input, ctx, extension_codec)?;
541 let sort_expr: Vec<SortExpr> =
542 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
543 let fetch: Option<usize> = sort.fetch.try_into().ok();
544 LogicalPlanBuilder::from(input)
545 .sort_with_limit(sort_expr, fetch)?
546 .build()
547 }
548 LogicalPlanType::Repartition(repartition) => {
549 use datafusion::logical_expr::Partitioning;
550 let input: LogicalPlan =
551 into_logical_plan!(repartition.input, ctx, extension_codec)?;
552 use protobuf::repartition_node::PartitionMethod;
553 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
554 internal_datafusion_err!(
555 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
556 )
557 })?;
558
559 let partitioning_scheme = match pb_partition_method {
560 PartitionMethod::Hash(protobuf::HashRepartition {
561 hash_expr: pb_hash_expr,
562 partition_count,
563 }) => Partitioning::Hash(
564 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
565 *partition_count as usize,
566 ),
567 PartitionMethod::RoundRobin(partition_count) => {
568 Partitioning::RoundRobinBatch(*partition_count as usize)
569 }
570 };
571
572 LogicalPlanBuilder::from(input)
573 .repartition(partitioning_scheme)?
574 .build()
575 }
576 LogicalPlanType::EmptyRelation(empty_relation) => {
577 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
578 }
579 LogicalPlanType::CreateExternalTable(create_extern_table) => {
580 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
581 DataFusionError::Internal(String::from(
582 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
583 ))
584 })?;
585
586 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
587 DataFusionError::Internal(String::from(
588 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
589 ))
590 })?;
591 let definition = if !create_extern_table.definition.is_empty() {
592 Some(create_extern_table.definition.clone())
593 } else {
594 None
595 };
596
597 let file_type = create_extern_table.file_type.as_str();
598 if ctx.table_factory(file_type).is_none() {
599 internal_err!("No TableProviderFactory for file type: {file_type}")?
600 }
601
602 let mut order_exprs = vec![];
603 for expr in &create_extern_table.order_exprs {
604 order_exprs.push(from_proto::parse_sorts(
605 &expr.sort_expr_nodes,
606 ctx,
607 extension_codec,
608 )?);
609 }
610
611 let mut column_defaults =
612 HashMap::with_capacity(create_extern_table.column_defaults.len());
613 for (col_name, expr) in &create_extern_table.column_defaults {
614 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
615 column_defaults.insert(col_name.clone(), expr);
616 }
617
618 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
619 CreateExternalTable {
620 schema: pb_schema.try_into()?,
621 name: from_table_reference(
622 create_extern_table.name.as_ref(),
623 "CreateExternalTable",
624 )?,
625 location: create_extern_table.location.clone(),
626 file_type: create_extern_table.file_type.clone(),
627 table_partition_cols: create_extern_table
628 .table_partition_cols
629 .clone(),
630 order_exprs,
631 if_not_exists: create_extern_table.if_not_exists,
632 temporary: create_extern_table.temporary,
633 definition,
634 unbounded: create_extern_table.unbounded,
635 options: create_extern_table.options.clone(),
636 constraints: constraints.into(),
637 column_defaults,
638 },
639 )))
640 }
641 LogicalPlanType::CreateView(create_view) => {
642 let plan = create_view
643 .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
644 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
645 )))?
646 .try_into_logical_plan(ctx, extension_codec)?;
647 let definition = if !create_view.definition.is_empty() {
648 Some(create_view.definition.clone())
649 } else {
650 None
651 };
652
653 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
654 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
655 temporary: create_view.temporary,
656 input: Arc::new(plan),
657 or_replace: create_view.or_replace,
658 definition,
659 })))
660 }
661 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
662 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
663 DataFusionError::Internal(String::from(
664 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
665 ))
666 })?;
667
668 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
669 CreateCatalogSchema {
670 schema_name: create_catalog_schema.schema_name.clone(),
671 if_not_exists: create_catalog_schema.if_not_exists,
672 schema: pb_schema.try_into()?,
673 },
674 )))
675 }
676 LogicalPlanType::CreateCatalog(create_catalog) => {
677 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
678 DataFusionError::Internal(String::from(
679 "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
680 ))
681 })?;
682
683 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
684 CreateCatalog {
685 catalog_name: create_catalog.catalog_name.clone(),
686 if_not_exists: create_catalog.if_not_exists,
687 schema: pb_schema.try_into()?,
688 },
689 )))
690 }
691 LogicalPlanType::Analyze(analyze) => {
692 let input: LogicalPlan =
693 into_logical_plan!(analyze.input, ctx, extension_codec)?;
694 LogicalPlanBuilder::from(input)
695 .explain(analyze.verbose, true)?
696 .build()
697 }
698 LogicalPlanType::Explain(explain) => {
699 let input: LogicalPlan =
700 into_logical_plan!(explain.input, ctx, extension_codec)?;
701 LogicalPlanBuilder::from(input)
702 .explain(explain.verbose, false)?
703 .build()
704 }
705 LogicalPlanType::SubqueryAlias(aliased_relation) => {
706 let input: LogicalPlan =
707 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
708 let alias = from_table_reference(
709 aliased_relation.alias.as_ref(),
710 "SubqueryAlias",
711 )?;
712 LogicalPlanBuilder::from(input).alias(alias)?.build()
713 }
714 LogicalPlanType::Limit(limit) => {
715 let input: LogicalPlan =
716 into_logical_plan!(limit.input, ctx, extension_codec)?;
717 let skip = limit.skip.max(0) as usize;
718
719 let fetch = if limit.fetch < 0 {
720 None
721 } else {
722 Some(limit.fetch as usize)
723 };
724
725 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
726 }
727 LogicalPlanType::Join(join) => {
728 let left_keys: Vec<Expr> =
729 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
730 let right_keys: Vec<Expr> =
731 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
732 let join_type =
733 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
734 proto_error(format!(
735 "Received a JoinNode message with unknown JoinType {}",
736 join.join_type
737 ))
738 })?;
739 let join_constraint = protobuf::JoinConstraint::try_from(
740 join.join_constraint,
741 )
742 .map_err(|_| {
743 proto_error(format!(
744 "Received a JoinNode message with unknown JoinConstraint {}",
745 join.join_constraint
746 ))
747 })?;
748 let filter: Option<Expr> = join
749 .filter
750 .as_ref()
751 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
752 .map_or(Ok(None), |v| v.map(Some))?;
753
754 let builder = LogicalPlanBuilder::from(into_logical_plan!(
755 join.left,
756 ctx,
757 extension_codec
758 )?);
759 let builder = match join_constraint.into() {
760 JoinConstraint::On => builder.join_with_expr_keys(
761 into_logical_plan!(join.right, ctx, extension_codec)?,
762 join_type.into(),
763 (left_keys, right_keys),
764 filter,
765 )?,
766 JoinConstraint::Using => {
767 let using_keys = left_keys
769 .into_iter()
770 .map(|key| {
771 key.try_as_col().cloned()
772 .ok_or_else(|| internal_datafusion_err!(
773 "Using join keys must be column references, got: {key:?}"
774 ))
775 })
776 .collect::<Result<Vec<_>, _>>()?;
777 builder.join_using(
778 into_logical_plan!(join.right, ctx, extension_codec)?,
779 join_type.into(),
780 using_keys,
781 )?
782 }
783 };
784
785 builder.build()
786 }
787 LogicalPlanType::Union(union) => {
788 if union.inputs.len() < 2 {
789 return Err( DataFusionError::Internal(String::from(
790 "Protobuf deserialization error, Union was require at least two input.",
791 )));
792 }
793 let (first, rest) = union.inputs.split_first().unwrap();
794 let mut builder = LogicalPlanBuilder::from(
795 first.try_into_logical_plan(ctx, extension_codec)?,
796 );
797
798 for i in rest {
799 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
800 builder = builder.union(plan)?;
801 }
802 builder.build()
803 }
804 LogicalPlanType::CrossJoin(crossjoin) => {
805 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
806 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
807
808 LogicalPlanBuilder::from(left).cross_join(right)?.build()
809 }
810 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
811 let input_plans: Vec<LogicalPlan> = inputs
812 .iter()
813 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
814 .collect::<Result<_>>()?;
815
816 let extension_node =
817 extension_codec.try_decode(node, &input_plans, ctx)?;
818 Ok(LogicalPlan::Extension(extension_node))
819 }
820 LogicalPlanType::Distinct(distinct) => {
821 let input: LogicalPlan =
822 into_logical_plan!(distinct.input, ctx, extension_codec)?;
823 LogicalPlanBuilder::from(input).distinct()?.build()
824 }
825 LogicalPlanType::DistinctOn(distinct_on) => {
826 let input: LogicalPlan =
827 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
828 let on_expr =
829 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
830 let select_expr = from_proto::parse_exprs(
831 &distinct_on.select_expr,
832 ctx,
833 extension_codec,
834 )?;
835 let sort_expr = match distinct_on.sort_expr.len() {
836 0 => None,
837 _ => Some(from_proto::parse_sorts(
838 &distinct_on.sort_expr,
839 ctx,
840 extension_codec,
841 )?),
842 };
843 LogicalPlanBuilder::from(input)
844 .distinct_on(on_expr, select_expr, sort_expr)?
845 .build()
846 }
847 LogicalPlanType::ViewScan(scan) => {
848 let schema: Schema = convert_required!(scan.schema)?;
849
850 let mut projection = None;
851 if let Some(columns) = &scan.projection {
852 let column_indices = columns
853 .columns
854 .iter()
855 .map(|name| schema.index_of(name))
856 .collect::<Result<Vec<usize>, _>>()?;
857 projection = Some(column_indices);
858 }
859
860 let input: LogicalPlan =
861 into_logical_plan!(scan.input, ctx, extension_codec)?;
862
863 let definition = if !scan.definition.is_empty() {
864 Some(scan.definition.clone())
865 } else {
866 None
867 };
868
869 let provider = ViewTable::new(input, definition);
870
871 let table_name =
872 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
873
874 LogicalPlanBuilder::scan(
875 table_name,
876 provider_as_source(Arc::new(provider)),
877 projection,
878 )?
879 .build()
880 }
881 LogicalPlanType::Prepare(prepare) => {
882 let input: LogicalPlan =
883 into_logical_plan!(prepare.input, ctx, extension_codec)?;
884 let data_types: Vec<DataType> = prepare
885 .data_types
886 .iter()
887 .map(DataType::try_from)
888 .collect::<Result<_, _>>()?;
889 LogicalPlanBuilder::from(input)
890 .prepare(prepare.name.clone(), data_types)?
891 .build()
892 }
893 LogicalPlanType::DropView(dropview) => {
894 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
895 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
896 if_exists: dropview.if_exists,
897 schema: Arc::new(convert_required!(dropview.schema)?),
898 })))
899 }
900 LogicalPlanType::CopyTo(copy) => {
901 let input: LogicalPlan =
902 into_logical_plan!(copy.input, ctx, extension_codec)?;
903
904 let file_type: Arc<dyn FileType> = format_as_file_type(
905 extension_codec.try_decode_file_format(©.file_type, ctx)?,
906 );
907
908 Ok(LogicalPlan::Copy(dml::CopyTo {
909 input: Arc::new(input),
910 output_url: copy.output_url.clone(),
911 partition_by: copy.partition_by.clone(),
912 file_type,
913 options: Default::default(),
914 }))
915 }
916 LogicalPlanType::Unnest(unnest) => {
917 let input: LogicalPlan =
918 into_logical_plan!(unnest.input, ctx, extension_codec)?;
919 Ok(LogicalPlan::Unnest(Unnest {
920 input: Arc::new(input),
921 exec_columns: unnest.exec_columns.iter().map(|c| c.into()).collect(),
922 list_type_columns: unnest
923 .list_type_columns
924 .iter()
925 .map(|c| {
926 let recursion_item = c.recursion.as_ref().unwrap();
927 (
928 c.input_index as _,
929 ColumnUnnestList {
930 output_column: recursion_item
931 .output_column
932 .as_ref()
933 .unwrap()
934 .into(),
935 depth: recursion_item.depth as _,
936 },
937 )
938 })
939 .collect(),
940 struct_type_columns: unnest
941 .struct_type_columns
942 .iter()
943 .map(|c| *c as usize)
944 .collect(),
945 dependency_indices: unnest
946 .dependency_indices
947 .iter()
948 .map(|c| *c as usize)
949 .collect(),
950 schema: Arc::new(convert_required!(unnest.schema)?),
951 options: into_required!(unnest.options)?,
952 }))
953 }
954 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
955 let static_term = recursive_query_node
956 .static_term
957 .as_ref()
958 .ok_or_else(|| DataFusionError::Internal(String::from(
959 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
960 )))?
961 .try_into_logical_plan(ctx, extension_codec)?;
962
963 let recursive_term = recursive_query_node
964 .recursive_term
965 .as_ref()
966 .ok_or_else(|| DataFusionError::Internal(String::from(
967 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
968 )))?
969 .try_into_logical_plan(ctx, extension_codec)?;
970
971 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
972 name: recursive_query_node.name.clone(),
973 static_term: Arc::new(static_term),
974 recursive_term: Arc::new(recursive_term),
975 is_distinct: recursive_query_node.is_distinct,
976 }))
977 }
978 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
979 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
980 let schema = convert_required!(*schema)?;
981 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
982 LogicalPlanBuilder::scan(
983 name.as_str(),
984 provider_as_source(Arc::new(cte_work_table)),
985 None,
986 )?
987 .build()
988 }
989 LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
990 datafusion::logical_expr::DmlStatement::new(
991 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
992 to_table_source(&dml_node.target, ctx, extension_codec)?,
993 dml_node.dml_type().into(),
994 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
995 ),
996 )),
997 }
998 }
999
1000 fn try_from_logical_plan(
1001 plan: &LogicalPlan,
1002 extension_codec: &dyn LogicalExtensionCodec,
1003 ) -> Result<Self>
1004 where
1005 Self: Sized,
1006 {
1007 match plan {
1008 LogicalPlan::Values(Values { values, .. }) => {
1009 let n_cols = if values.is_empty() {
1010 0
1011 } else {
1012 values[0].len()
1013 } as u64;
1014 let values_list =
1015 serialize_exprs(values.iter().flatten(), extension_codec)?;
1016 Ok(LogicalPlanNode {
1017 logical_plan_type: Some(LogicalPlanType::Values(
1018 protobuf::ValuesNode {
1019 n_cols,
1020 values_list,
1021 },
1022 )),
1023 })
1024 }
1025 LogicalPlan::TableScan(TableScan {
1026 table_name,
1027 source,
1028 filters,
1029 projection,
1030 ..
1031 }) => {
1032 let provider = source_as_provider(source)?;
1033 let schema = provider.schema();
1034 let source = provider.as_any();
1035
1036 let projection = match projection {
1037 None => None,
1038 Some(columns) => {
1039 let column_names = columns
1040 .iter()
1041 .map(|i| schema.field(*i).name().to_owned())
1042 .collect();
1043 Some(protobuf::ProjectionColumns {
1044 columns: column_names,
1045 })
1046 }
1047 };
1048
1049 let filters: Vec<protobuf::LogicalExprNode> =
1050 serialize_exprs(filters, extension_codec)?;
1051
1052 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1053 let any = listing_table.options().format.as_any();
1054 let file_format_type = {
1055 let mut maybe_some_type = None;
1056
1057 #[cfg(feature = "parquet")]
1058 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1059 let options = parquet.options();
1060 maybe_some_type =
1061 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1062 options: Some(options.try_into()?),
1063 }));
1064 };
1065
1066 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1067 let options = csv.options();
1068 maybe_some_type =
1069 Some(FileFormatType::Csv(protobuf::CsvFormat {
1070 options: Some(options.try_into()?),
1071 }));
1072 }
1073
1074 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1075 let options = json.options();
1076 maybe_some_type =
1077 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1078 options: Some(options.try_into()?),
1079 }))
1080 }
1081
1082 #[cfg(feature = "avro")]
1083 if any.is::<AvroFormat>() {
1084 maybe_some_type =
1085 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1086 }
1087
1088 if let Some(file_format_type) = maybe_some_type {
1089 file_format_type
1090 } else {
1091 return Err(proto_error(format!(
1092 "Error converting file format, {:?} is invalid as a datafusion format.",
1093 listing_table.options().format
1094 )));
1095 }
1096 };
1097
1098 let options = listing_table.options();
1099
1100 let mut builder = SchemaBuilder::from(schema.as_ref());
1101 for (idx, field) in schema.fields().iter().enumerate().rev() {
1102 if options
1103 .table_partition_cols
1104 .iter()
1105 .any(|(name, _)| name == field.name())
1106 {
1107 builder.remove(idx);
1108 }
1109 }
1110
1111 let schema = builder.finish();
1112
1113 let schema: protobuf::Schema = (&schema).try_into()?;
1114
1115 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1116 for order in &options.file_sort_order {
1117 let expr_vec = SortExprNodeCollection {
1118 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1119 };
1120 exprs_vec.push(expr_vec);
1121 }
1122
1123 let partition_columns = options
1124 .table_partition_cols
1125 .iter()
1126 .map(|(name, arrow_type)| {
1127 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1128 .map_err(|e| {
1129 proto_error(format!(
1130 "Received an unknown ArrowType: {e}"
1131 ))
1132 })?;
1133 Ok(protobuf::PartitionColumn {
1134 name: name.clone(),
1135 arrow_type: Some(arrow_type),
1136 })
1137 })
1138 .collect::<Result<Vec<_>>>()?;
1139
1140 Ok(LogicalPlanNode {
1141 logical_plan_type: Some(LogicalPlanType::ListingScan(
1142 protobuf::ListingTableScanNode {
1143 file_format_type: Some(file_format_type),
1144 table_name: Some(table_name.clone().into()),
1145 collect_stat: options.collect_stat,
1146 file_extension: options.file_extension.clone(),
1147 table_partition_cols: partition_columns,
1148 paths: listing_table
1149 .table_paths()
1150 .iter()
1151 .map(|x| x.to_string())
1152 .collect(),
1153 schema: Some(schema),
1154 projection,
1155 filters,
1156 target_partitions: options.target_partitions as u32,
1157 file_sort_order: exprs_vec,
1158 },
1159 )),
1160 })
1161 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1162 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1163 Ok(LogicalPlanNode {
1164 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1165 protobuf::ViewTableScanNode {
1166 table_name: Some(table_name.clone().into()),
1167 input: Some(Box::new(
1168 LogicalPlanNode::try_from_logical_plan(
1169 view_table.logical_plan(),
1170 extension_codec,
1171 )?,
1172 )),
1173 schema: Some(schema),
1174 projection,
1175 definition: view_table
1176 .definition()
1177 .map(|s| s.to_string())
1178 .unwrap_or_default(),
1179 },
1180 ))),
1181 })
1182 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1183 {
1184 let name = cte_work_table.name().to_string();
1185 let schema = cte_work_table.schema();
1186 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1187
1188 Ok(LogicalPlanNode {
1189 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1190 protobuf::CteWorkTableScanNode {
1191 name,
1192 schema: Some(schema),
1193 },
1194 )),
1195 })
1196 } else {
1197 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1198 let mut bytes = vec![];
1199 extension_codec
1200 .try_encode_table_provider(table_name, provider, &mut bytes)
1201 .map_err(|e| context!("Error serializing custom table", e))?;
1202 let scan = CustomScan(CustomTableScanNode {
1203 table_name: Some(table_name.clone().into()),
1204 projection,
1205 schema: Some(schema),
1206 filters,
1207 custom_table_data: bytes,
1208 });
1209 let node = LogicalPlanNode {
1210 logical_plan_type: Some(scan),
1211 };
1212 Ok(node)
1213 }
1214 }
1215 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1216 Ok(LogicalPlanNode {
1217 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1218 protobuf::ProjectionNode {
1219 input: Some(Box::new(
1220 LogicalPlanNode::try_from_logical_plan(
1221 input.as_ref(),
1222 extension_codec,
1223 )?,
1224 )),
1225 expr: serialize_exprs(expr, extension_codec)?,
1226 optional_alias: None,
1227 },
1228 ))),
1229 })
1230 }
1231 LogicalPlan::Filter(filter) => {
1232 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1233 filter.input.as_ref(),
1234 extension_codec,
1235 )?;
1236 Ok(LogicalPlanNode {
1237 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1238 protobuf::SelectionNode {
1239 input: Some(Box::new(input)),
1240 expr: Some(serialize_expr(
1241 &filter.predicate,
1242 extension_codec,
1243 )?),
1244 },
1245 ))),
1246 })
1247 }
1248 LogicalPlan::Distinct(Distinct::All(input)) => {
1249 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1250 input.as_ref(),
1251 extension_codec,
1252 )?;
1253 Ok(LogicalPlanNode {
1254 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1255 protobuf::DistinctNode {
1256 input: Some(Box::new(input)),
1257 },
1258 ))),
1259 })
1260 }
1261 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1262 on_expr,
1263 select_expr,
1264 sort_expr,
1265 input,
1266 ..
1267 })) => {
1268 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1269 input.as_ref(),
1270 extension_codec,
1271 )?;
1272 let sort_expr = match sort_expr {
1273 None => vec![],
1274 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1275 };
1276 Ok(LogicalPlanNode {
1277 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1278 protobuf::DistinctOnNode {
1279 on_expr: serialize_exprs(on_expr, extension_codec)?,
1280 select_expr: serialize_exprs(select_expr, extension_codec)?,
1281 sort_expr,
1282 input: Some(Box::new(input)),
1283 },
1284 ))),
1285 })
1286 }
1287 LogicalPlan::Window(Window {
1288 input, window_expr, ..
1289 }) => {
1290 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1291 input.as_ref(),
1292 extension_codec,
1293 )?;
1294 Ok(LogicalPlanNode {
1295 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1296 protobuf::WindowNode {
1297 input: Some(Box::new(input)),
1298 window_expr: serialize_exprs(window_expr, extension_codec)?,
1299 },
1300 ))),
1301 })
1302 }
1303 LogicalPlan::Aggregate(Aggregate {
1304 group_expr,
1305 aggr_expr,
1306 input,
1307 ..
1308 }) => {
1309 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310 input.as_ref(),
1311 extension_codec,
1312 )?;
1313 Ok(LogicalPlanNode {
1314 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1315 protobuf::AggregateNode {
1316 input: Some(Box::new(input)),
1317 group_expr: serialize_exprs(group_expr, extension_codec)?,
1318 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1319 },
1320 ))),
1321 })
1322 }
1323 LogicalPlan::Join(Join {
1324 left,
1325 right,
1326 on,
1327 filter,
1328 join_type,
1329 join_constraint,
1330 null_equals_null,
1331 ..
1332 }) => {
1333 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1334 left.as_ref(),
1335 extension_codec,
1336 )?;
1337 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1338 right.as_ref(),
1339 extension_codec,
1340 )?;
1341 let (left_join_key, right_join_key) = on
1342 .iter()
1343 .map(|(l, r)| {
1344 Ok((
1345 serialize_expr(l, extension_codec)?,
1346 serialize_expr(r, extension_codec)?,
1347 ))
1348 })
1349 .collect::<Result<Vec<_>, ToProtoError>>()?
1350 .into_iter()
1351 .unzip();
1352 let join_type: protobuf::JoinType = join_type.to_owned().into();
1353 let join_constraint: protobuf::JoinConstraint =
1354 join_constraint.to_owned().into();
1355 let filter = filter
1356 .as_ref()
1357 .map(|e| serialize_expr(e, extension_codec))
1358 .map_or(Ok(None), |v| v.map(Some))?;
1359 Ok(LogicalPlanNode {
1360 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1361 protobuf::JoinNode {
1362 left: Some(Box::new(left)),
1363 right: Some(Box::new(right)),
1364 join_type: join_type.into(),
1365 join_constraint: join_constraint.into(),
1366 left_join_key,
1367 right_join_key,
1368 null_equals_null: *null_equals_null,
1369 filter,
1370 },
1371 ))),
1372 })
1373 }
1374 LogicalPlan::Subquery(_) => {
1375 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1376 }
1377 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1378 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1379 input.as_ref(),
1380 extension_codec,
1381 )?;
1382 Ok(LogicalPlanNode {
1383 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1384 protobuf::SubqueryAliasNode {
1385 input: Some(Box::new(input)),
1386 alias: Some((*alias).clone().into()),
1387 },
1388 ))),
1389 })
1390 }
1391 LogicalPlan::Limit(limit) => {
1392 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1393 limit.input.as_ref(),
1394 extension_codec,
1395 )?;
1396 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1397 return Err(proto_error(
1398 "LogicalPlan::Limit only supports literal skip values",
1399 ));
1400 };
1401 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1402 return Err(proto_error(
1403 "LogicalPlan::Limit only supports literal fetch values",
1404 ));
1405 };
1406
1407 Ok(LogicalPlanNode {
1408 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1409 protobuf::LimitNode {
1410 input: Some(Box::new(input)),
1411 skip: skip as i64,
1412 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1413 },
1414 ))),
1415 })
1416 }
1417 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1418 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1419 input.as_ref(),
1420 extension_codec,
1421 )?;
1422 let sort_expr: Vec<protobuf::SortExprNode> =
1423 serialize_sorts(expr, extension_codec)?;
1424 Ok(LogicalPlanNode {
1425 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1426 protobuf::SortNode {
1427 input: Some(Box::new(input)),
1428 expr: sort_expr,
1429 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1430 },
1431 ))),
1432 })
1433 }
1434 LogicalPlan::Repartition(Repartition {
1435 input,
1436 partitioning_scheme,
1437 }) => {
1438 use datafusion::logical_expr::Partitioning;
1439 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1440 input.as_ref(),
1441 extension_codec,
1442 )?;
1443
1444 use protobuf::repartition_node::PartitionMethod;
1447
1448 let pb_partition_method = match partitioning_scheme {
1449 Partitioning::Hash(exprs, partition_count) => {
1450 PartitionMethod::Hash(protobuf::HashRepartition {
1451 hash_expr: serialize_exprs(exprs, extension_codec)?,
1452 partition_count: *partition_count as u64,
1453 })
1454 }
1455 Partitioning::RoundRobinBatch(partition_count) => {
1456 PartitionMethod::RoundRobin(*partition_count as u64)
1457 }
1458 Partitioning::DistributeBy(_) => {
1459 return not_impl_err!("DistributeBy")
1460 }
1461 };
1462
1463 Ok(LogicalPlanNode {
1464 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1465 protobuf::RepartitionNode {
1466 input: Some(Box::new(input)),
1467 partition_method: Some(pb_partition_method),
1468 },
1469 ))),
1470 })
1471 }
1472 LogicalPlan::EmptyRelation(EmptyRelation {
1473 produce_one_row, ..
1474 }) => Ok(LogicalPlanNode {
1475 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1476 protobuf::EmptyRelationNode {
1477 produce_one_row: *produce_one_row,
1478 },
1479 )),
1480 }),
1481 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1482 CreateExternalTable {
1483 name,
1484 location,
1485 file_type,
1486 schema: df_schema,
1487 table_partition_cols,
1488 if_not_exists,
1489 definition,
1490 order_exprs,
1491 unbounded,
1492 options,
1493 constraints,
1494 column_defaults,
1495 temporary,
1496 },
1497 )) => {
1498 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1499 for order in order_exprs {
1500 let temp = SortExprNodeCollection {
1501 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1502 };
1503 converted_order_exprs.push(temp);
1504 }
1505
1506 let mut converted_column_defaults =
1507 HashMap::with_capacity(column_defaults.len());
1508 for (col_name, expr) in column_defaults {
1509 converted_column_defaults
1510 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1511 }
1512
1513 Ok(LogicalPlanNode {
1514 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1515 protobuf::CreateExternalTableNode {
1516 name: Some(name.clone().into()),
1517 location: location.clone(),
1518 file_type: file_type.clone(),
1519 schema: Some(df_schema.try_into()?),
1520 table_partition_cols: table_partition_cols.clone(),
1521 if_not_exists: *if_not_exists,
1522 temporary: *temporary,
1523 order_exprs: converted_order_exprs,
1524 definition: definition.clone().unwrap_or_default(),
1525 unbounded: *unbounded,
1526 options: options.clone(),
1527 constraints: Some(constraints.clone().into()),
1528 column_defaults: converted_column_defaults,
1529 },
1530 )),
1531 })
1532 }
1533 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1534 name,
1535 input,
1536 or_replace,
1537 definition,
1538 temporary,
1539 })) => Ok(LogicalPlanNode {
1540 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1541 protobuf::CreateViewNode {
1542 name: Some(name.clone().into()),
1543 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1544 input,
1545 extension_codec,
1546 )?)),
1547 or_replace: *or_replace,
1548 temporary: *temporary,
1549 definition: definition.clone().unwrap_or_default(),
1550 },
1551 ))),
1552 }),
1553 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1554 CreateCatalogSchema {
1555 schema_name,
1556 if_not_exists,
1557 schema: df_schema,
1558 },
1559 )) => Ok(LogicalPlanNode {
1560 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1561 protobuf::CreateCatalogSchemaNode {
1562 schema_name: schema_name.clone(),
1563 if_not_exists: *if_not_exists,
1564 schema: Some(df_schema.try_into()?),
1565 },
1566 )),
1567 }),
1568 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1569 catalog_name,
1570 if_not_exists,
1571 schema: df_schema,
1572 })) => Ok(LogicalPlanNode {
1573 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1574 protobuf::CreateCatalogNode {
1575 catalog_name: catalog_name.clone(),
1576 if_not_exists: *if_not_exists,
1577 schema: Some(df_schema.try_into()?),
1578 },
1579 )),
1580 }),
1581 LogicalPlan::Analyze(a) => {
1582 let input = LogicalPlanNode::try_from_logical_plan(
1583 a.input.as_ref(),
1584 extension_codec,
1585 )?;
1586 Ok(LogicalPlanNode {
1587 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1588 protobuf::AnalyzeNode {
1589 input: Some(Box::new(input)),
1590 verbose: a.verbose,
1591 },
1592 ))),
1593 })
1594 }
1595 LogicalPlan::Explain(a) => {
1596 let input = LogicalPlanNode::try_from_logical_plan(
1597 a.plan.as_ref(),
1598 extension_codec,
1599 )?;
1600 Ok(LogicalPlanNode {
1601 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1602 protobuf::ExplainNode {
1603 input: Some(Box::new(input)),
1604 verbose: a.verbose,
1605 },
1606 ))),
1607 })
1608 }
1609 LogicalPlan::Union(union) => {
1610 let inputs: Vec<LogicalPlanNode> = union
1611 .inputs
1612 .iter()
1613 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1614 .collect::<Result<_>>()?;
1615 Ok(LogicalPlanNode {
1616 logical_plan_type: Some(LogicalPlanType::Union(
1617 protobuf::UnionNode { inputs },
1618 )),
1619 })
1620 }
1621 LogicalPlan::Extension(extension) => {
1622 let mut buf: Vec<u8> = vec![];
1623 extension_codec.try_encode(extension, &mut buf)?;
1624
1625 let inputs: Vec<LogicalPlanNode> = extension
1626 .node
1627 .inputs()
1628 .iter()
1629 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1630 .collect::<Result<_>>()?;
1631
1632 Ok(LogicalPlanNode {
1633 logical_plan_type: Some(LogicalPlanType::Extension(
1634 LogicalExtensionNode { node: buf, inputs },
1635 )),
1636 })
1637 }
1638 LogicalPlan::Statement(Statement::Prepare(Prepare {
1639 name,
1640 data_types,
1641 input,
1642 })) => {
1643 let input =
1644 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1645 Ok(LogicalPlanNode {
1646 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1647 protobuf::PrepareNode {
1648 name: name.clone(),
1649 data_types: data_types
1650 .iter()
1651 .map(|t| t.try_into())
1652 .collect::<Result<Vec<_>, _>>()?,
1653 input: Some(Box::new(input)),
1654 },
1655 ))),
1656 })
1657 }
1658 LogicalPlan::Unnest(Unnest {
1659 input,
1660 exec_columns,
1661 list_type_columns,
1662 struct_type_columns,
1663 dependency_indices,
1664 schema,
1665 options,
1666 }) => {
1667 let input =
1668 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1669 let proto_unnest_list_items = list_type_columns
1670 .iter()
1671 .map(|(index, ul)| ColumnUnnestListItem {
1672 input_index: *index as _,
1673 recursion: Some(ColumnUnnestListRecursion {
1674 output_column: Some(ul.output_column.to_owned().into()),
1675 depth: ul.depth as _,
1676 }),
1677 })
1678 .collect();
1679 Ok(LogicalPlanNode {
1680 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1681 protobuf::UnnestNode {
1682 input: Some(Box::new(input)),
1683 exec_columns: exec_columns
1684 .iter()
1685 .map(|col| col.into())
1686 .collect(),
1687 list_type_columns: proto_unnest_list_items,
1688 struct_type_columns: struct_type_columns
1689 .iter()
1690 .map(|c| *c as u64)
1691 .collect(),
1692 dependency_indices: dependency_indices
1693 .iter()
1694 .map(|c| *c as u64)
1695 .collect(),
1696 schema: Some(schema.try_into()?),
1697 options: Some(options.into()),
1698 },
1699 ))),
1700 })
1701 }
1702 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1703 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1704 )),
1705 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1706 "LogicalPlan serde is not yet implemented for CreateIndex",
1707 )),
1708 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1709 "LogicalPlan serde is not yet implemented for DropTable",
1710 )),
1711 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1712 name,
1713 if_exists,
1714 schema,
1715 })) => Ok(LogicalPlanNode {
1716 logical_plan_type: Some(LogicalPlanType::DropView(
1717 protobuf::DropViewNode {
1718 name: Some(name.clone().into()),
1719 if_exists: *if_exists,
1720 schema: Some(schema.try_into()?),
1721 },
1722 )),
1723 }),
1724 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1725 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1726 )),
1727 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1728 "LogicalPlan serde is not yet implemented for CreateFunction",
1729 )),
1730 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1731 "LogicalPlan serde is not yet implemented for DropFunction",
1732 )),
1733 LogicalPlan::Statement(_) => Err(proto_error(
1734 "LogicalPlan serde is not yet implemented for Statement",
1735 )),
1736 LogicalPlan::Dml(DmlStatement {
1737 table_name,
1738 target,
1739 op,
1740 input,
1741 ..
1742 }) => {
1743 let input =
1744 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1745 let dml_type: dml_node::Type = op.into();
1746 Ok(LogicalPlanNode {
1747 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1748 input: Some(Box::new(input)),
1749 target: Some(Box::new(from_table_source(
1750 table_name.clone(),
1751 Arc::clone(target),
1752 extension_codec,
1753 )?)),
1754 table_name: Some(table_name.clone().into()),
1755 dml_type: dml_type.into(),
1756 }))),
1757 })
1758 }
1759 LogicalPlan::Copy(dml::CopyTo {
1760 input,
1761 output_url,
1762 file_type,
1763 partition_by,
1764 ..
1765 }) => {
1766 let input =
1767 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1768 let mut buf = Vec::new();
1769 extension_codec
1770 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1771
1772 Ok(LogicalPlanNode {
1773 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1774 protobuf::CopyToNode {
1775 input: Some(Box::new(input)),
1776 output_url: output_url.to_string(),
1777 file_type: buf,
1778 partition_by: partition_by.clone(),
1779 },
1780 ))),
1781 })
1782 }
1783 LogicalPlan::DescribeTable(_) => Err(proto_error(
1784 "LogicalPlan serde is not yet implemented for DescribeTable",
1785 )),
1786 LogicalPlan::RecursiveQuery(recursive) => {
1787 let static_term = LogicalPlanNode::try_from_logical_plan(
1788 recursive.static_term.as_ref(),
1789 extension_codec,
1790 )?;
1791 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1792 recursive.recursive_term.as_ref(),
1793 extension_codec,
1794 )?;
1795
1796 Ok(LogicalPlanNode {
1797 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1798 protobuf::RecursiveQueryNode {
1799 name: recursive.name.clone(),
1800 static_term: Some(Box::new(static_term)),
1801 recursive_term: Some(Box::new(recursive_term)),
1802 is_distinct: recursive.is_distinct,
1803 },
1804 ))),
1805 })
1806 }
1807 }
1808 }
1809}