1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38use datafusion::datasource::file_format::arrow::ArrowFormat;
39#[cfg(feature = "avro")]
40use datafusion::datasource::file_format::avro::AvroFormat;
41#[cfg(feature = "parquet")]
42use datafusion::datasource::file_format::parquet::ParquetFormat;
43use datafusion::datasource::file_format::{
44 file_type_to_format, format_as_file_type, FileFormatFactory,
45};
46use datafusion::{
47 datasource::{
48 file_format::{
49 csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
50 },
51 listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
52 view::ViewTable,
53 TableProvider,
54 },
55 datasource::{provider_as_source, source_as_provider},
56 prelude::SessionContext,
57};
58use datafusion_common::file_options::file_type::FileType;
59use datafusion_common::{
60 context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
61 DataFusionError, Result, TableReference, ToDFSchema,
62};
63use datafusion_expr::{
64 dml,
65 logical_plan::{
66 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
67 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
68 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
69 SubqueryAlias, TableScan, Values, Window,
70 },
71 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
72 Statement, WindowUDF,
73};
74use datafusion_expr::{
75 AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
76};
77
78use self::to_proto::{serialize_expr, serialize_exprs};
79use crate::logical_plan::to_proto::serialize_sorts;
80use prost::bytes::BufMut;
81use prost::Message;
82
83pub mod file_formats;
84pub mod from_proto;
85pub mod to_proto;
86
87pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
88 fn try_decode(buf: &[u8]) -> Result<Self>
89 where
90 Self: Sized;
91
92 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
93 where
94 B: BufMut,
95 Self: Sized;
96
97 fn try_into_logical_plan(
98 &self,
99 ctx: &SessionContext,
100 extension_codec: &dyn LogicalExtensionCodec,
101 ) -> Result<LogicalPlan>;
102
103 fn try_from_logical_plan(
104 plan: &LogicalPlan,
105 extension_codec: &dyn LogicalExtensionCodec,
106 ) -> Result<Self>
107 where
108 Self: Sized;
109}
110
111pub trait LogicalExtensionCodec: Debug + Send + Sync {
112 fn try_decode(
113 &self,
114 buf: &[u8],
115 inputs: &[LogicalPlan],
116 ctx: &SessionContext,
117 ) -> Result<Extension>;
118
119 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
120
121 fn try_decode_table_provider(
122 &self,
123 buf: &[u8],
124 table_ref: &TableReference,
125 schema: SchemaRef,
126 ctx: &SessionContext,
127 ) -> Result<Arc<dyn TableProvider>>;
128
129 fn try_encode_table_provider(
130 &self,
131 table_ref: &TableReference,
132 node: Arc<dyn TableProvider>,
133 buf: &mut Vec<u8>,
134 ) -> Result<()>;
135
136 fn try_decode_file_format(
137 &self,
138 _buf: &[u8],
139 _ctx: &SessionContext,
140 ) -> Result<Arc<dyn FileFormatFactory>> {
141 not_impl_err!("LogicalExtensionCodec is not provided for file format")
142 }
143
144 fn try_encode_file_format(
145 &self,
146 _buf: &mut Vec<u8>,
147 _node: Arc<dyn FileFormatFactory>,
148 ) -> Result<()> {
149 Ok(())
150 }
151
152 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
153 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
154 }
155
156 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
157 Ok(())
158 }
159
160 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
161 not_impl_err!(
162 "LogicalExtensionCodec is not provided for aggregate function {name}"
163 )
164 }
165
166 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
167 Ok(())
168 }
169
170 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
171 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
172 }
173
174 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
175 Ok(())
176 }
177}
178
179#[derive(Debug, Clone)]
180pub struct DefaultLogicalExtensionCodec {}
181
182impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
183 fn try_decode(
184 &self,
185 _buf: &[u8],
186 _inputs: &[LogicalPlan],
187 _ctx: &SessionContext,
188 ) -> Result<Extension> {
189 not_impl_err!("LogicalExtensionCodec is not provided")
190 }
191
192 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
193 not_impl_err!("LogicalExtensionCodec is not provided")
194 }
195
196 fn try_decode_table_provider(
197 &self,
198 _buf: &[u8],
199 _table_ref: &TableReference,
200 _schema: SchemaRef,
201 _ctx: &SessionContext,
202 ) -> Result<Arc<dyn TableProvider>> {
203 not_impl_err!("LogicalExtensionCodec is not provided")
204 }
205
206 fn try_encode_table_provider(
207 &self,
208 _table_ref: &TableReference,
209 _node: Arc<dyn TableProvider>,
210 _buf: &mut Vec<u8>,
211 ) -> Result<()> {
212 not_impl_err!("LogicalExtensionCodec is not provided")
213 }
214}
215
216#[macro_export]
217macro_rules! into_logical_plan {
218 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
219 if let Some(field) = $PB.as_ref() {
220 field.as_ref().try_into_logical_plan($CTX, $CODEC)
221 } else {
222 Err(proto_error("Missing required field in protobuf"))
223 }
224 }};
225}
226
227fn from_table_reference(
228 table_ref: Option<&protobuf::TableReference>,
229 error_context: &str,
230) -> Result<TableReference> {
231 let table_ref = table_ref.ok_or_else(|| {
232 DataFusionError::Internal(format!(
233 "Protobuf deserialization error, {error_context} was missing required field name."
234 ))
235 })?;
236
237 Ok(table_ref.clone().try_into()?)
238}
239
240fn to_table_source(
244 node: &Option<Box<LogicalPlanNode>>,
245 ctx: &SessionContext,
246 extension_codec: &dyn LogicalExtensionCodec,
247) -> Result<Arc<dyn TableSource>> {
248 if let Some(node) = node {
249 match node.try_into_logical_plan(ctx, extension_codec)? {
250 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
251 _ => plan_err!("expected TableScan node"),
252 }
253 } else {
254 plan_err!("LogicalPlanNode should be provided")
255 }
256}
257
258fn from_table_source(
262 table_name: TableReference,
263 target: Arc<dyn TableSource>,
264 extension_codec: &dyn LogicalExtensionCodec,
265) -> Result<LogicalPlanNode> {
266 let projected_schema = target.schema().to_dfschema_ref()?;
267 let r = LogicalPlan::TableScan(TableScan {
268 table_name,
269 source: target,
270 projection: None,
271 projected_schema,
272 filters: vec![],
273 fetch: None,
274 });
275
276 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
277}
278
279impl AsLogicalPlan for LogicalPlanNode {
280 fn try_decode(buf: &[u8]) -> Result<Self>
281 where
282 Self: Sized,
283 {
284 LogicalPlanNode::decode(buf).map_err(|e| {
285 DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
286 })
287 }
288
289 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
290 where
291 B: BufMut,
292 Self: Sized,
293 {
294 self.encode(buf).map_err(|e| {
295 DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
296 })
297 }
298
299 fn try_into_logical_plan(
300 &self,
301 ctx: &SessionContext,
302 extension_codec: &dyn LogicalExtensionCodec,
303 ) -> Result<LogicalPlan> {
304 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
305 proto_error(format!(
306 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
307 ))
308 })?;
309 match plan {
310 LogicalPlanType::Values(values) => {
311 let n_cols = values.n_cols as usize;
312 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
313 Ok(Vec::new())
314 } else if values.values_list.len() % n_cols != 0 {
315 internal_err!(
316 "Invalid values list length, expect {} to be divisible by {}",
317 values.values_list.len(),
318 n_cols
319 )
320 } else {
321 values
322 .values_list
323 .chunks_exact(n_cols)
324 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
325 .collect::<Result<Vec<_>, _>>()
326 .map_err(|e| e.into())
327 }?;
328
329 LogicalPlanBuilder::values(values)?.build()
330 }
331 LogicalPlanType::Projection(projection) => {
332 let input: LogicalPlan =
333 into_logical_plan!(projection.input, ctx, extension_codec)?;
334 let expr: Vec<Expr> =
335 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
336
337 let new_proj = project(input, expr)?;
338 match projection.optional_alias.as_ref() {
339 Some(a) => match a {
340 protobuf::projection_node::OptionalAlias::Alias(alias) => {
341 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
342 Arc::new(new_proj),
343 alias.clone(),
344 )?))
345 }
346 },
347 _ => Ok(new_proj),
348 }
349 }
350 LogicalPlanType::Selection(selection) => {
351 let input: LogicalPlan =
352 into_logical_plan!(selection.input, ctx, extension_codec)?;
353 let expr: Expr = selection
354 .expr
355 .as_ref()
356 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
357 .transpose()?
358 .ok_or_else(|| proto_error("expression required"))?;
359 LogicalPlanBuilder::from(input).filter(expr)?.build()
360 }
361 LogicalPlanType::Window(window) => {
362 let input: LogicalPlan =
363 into_logical_plan!(window.input, ctx, extension_codec)?;
364 let window_expr =
365 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
366 LogicalPlanBuilder::from(input).window(window_expr)?.build()
367 }
368 LogicalPlanType::Aggregate(aggregate) => {
369 let input: LogicalPlan =
370 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
371 let group_expr =
372 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
373 let aggr_expr =
374 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
375 LogicalPlanBuilder::from(input)
376 .aggregate(group_expr, aggr_expr)?
377 .build()
378 }
379 LogicalPlanType::ListingScan(scan) => {
380 let schema: Schema = convert_required!(scan.schema)?;
381
382 let mut projection = None;
383 if let Some(columns) = &scan.projection {
384 let column_indices = columns
385 .columns
386 .iter()
387 .map(|name| schema.index_of(name))
388 .collect::<Result<Vec<usize>, _>>()?;
389 projection = Some(column_indices);
390 }
391
392 let filters =
393 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
394
395 let mut all_sort_orders = vec![];
396 for order in &scan.file_sort_order {
397 all_sort_orders.push(from_proto::parse_sorts(
398 &order.sort_expr_nodes,
399 ctx,
400 extension_codec,
401 )?)
402 }
403
404 let file_format: Arc<dyn FileFormat> =
405 match scan.file_format_type.as_ref().ok_or_else(|| {
406 proto_error(format!(
407 "logical_plan::from_proto() Unsupported file format '{self:?}'"
408 ))
409 })? {
410 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
411 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
412 #[cfg(feature = "parquet")]
413 {
414 let mut parquet = ParquetFormat::default();
415 if let Some(options) = options {
416 parquet = parquet.with_options(options.try_into()?)
417 }
418 Arc::new(parquet)
419 }
420 #[cfg(not(feature = "parquet"))]
421 panic!("Unable to process parquet file since `parquet` feature is not enabled");
422 }
423 FileFormatType::Csv(protobuf::CsvFormat {
424 options
425 }) => {
426 let mut csv = CsvFormat::default();
427 if let Some(options) = options {
428 csv = csv.with_options(options.try_into()?)
429 }
430 Arc::new(csv)
431 },
432 FileFormatType::Json(protobuf::NdJsonFormat {
433 options
434 }) => {
435 let mut json = OtherNdJsonFormat::default();
436 if let Some(options) = options {
437 json = json.with_options(options.try_into()?)
438 }
439 Arc::new(json)
440 }
441 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
442 FileFormatType::Avro(..) => {
443 #[cfg(feature = "avro")]
444 {
445 Arc::new(AvroFormat)
446 }
447 #[cfg(not(feature = "avro"))]
448 panic!("Unable to process avro file since `avro` feature is not enabled");
449 }
450 FileFormatType::Arrow(..) => {
451 Arc::new(ArrowFormat)
452 }
453 };
454
455 let table_paths = &scan
456 .paths
457 .iter()
458 .map(ListingTableUrl::parse)
459 .collect::<Result<Vec<_>, _>>()?;
460
461 let partition_columns = scan
462 .table_partition_cols
463 .iter()
464 .map(|col| {
465 let Some(arrow_type) = col.arrow_type.as_ref() else {
466 return Err(proto_error(
467 "Missing Arrow type in partition columns",
468 ));
469 };
470 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
471 proto_error(format!("Received an unknown ArrowType: {e}"))
472 })?;
473 Ok((col.name.clone(), arrow_type))
474 })
475 .collect::<Result<Vec<_>>>()?;
476
477 let options = ListingOptions::new(file_format)
478 .with_file_extension(&scan.file_extension)
479 .with_table_partition_cols(partition_columns)
480 .with_collect_stat(scan.collect_stat)
481 .with_target_partitions(scan.target_partitions as usize)
482 .with_file_sort_order(all_sort_orders);
483
484 let config =
485 ListingTableConfig::new_with_multi_paths(table_paths.clone())
486 .with_listing_options(options)
487 .with_schema(Arc::new(schema));
488
489 let provider = ListingTable::try_new(config)?.with_cache(
490 ctx.state()
491 .runtime_env()
492 .cache_manager
493 .get_file_statistic_cache(),
494 );
495
496 let table_name =
497 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
498
499 LogicalPlanBuilder::scan_with_filters(
500 table_name,
501 provider_as_source(Arc::new(provider)),
502 projection,
503 filters,
504 )?
505 .build()
506 }
507 LogicalPlanType::CustomScan(scan) => {
508 let schema: Schema = convert_required!(scan.schema)?;
509 let schema = Arc::new(schema);
510 let mut projection = None;
511 if let Some(columns) = &scan.projection {
512 let column_indices = columns
513 .columns
514 .iter()
515 .map(|name| schema.index_of(name))
516 .collect::<Result<Vec<usize>, _>>()?;
517 projection = Some(column_indices);
518 }
519
520 let filters =
521 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
522
523 let table_name =
524 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
525
526 let provider = extension_codec.try_decode_table_provider(
527 &scan.custom_table_data,
528 &table_name,
529 schema,
530 ctx,
531 )?;
532
533 LogicalPlanBuilder::scan_with_filters(
534 table_name,
535 provider_as_source(provider),
536 projection,
537 filters,
538 )?
539 .build()
540 }
541 LogicalPlanType::Sort(sort) => {
542 let input: LogicalPlan =
543 into_logical_plan!(sort.input, ctx, extension_codec)?;
544 let sort_expr: Vec<SortExpr> =
545 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
546 let fetch: Option<usize> = sort.fetch.try_into().ok();
547 LogicalPlanBuilder::from(input)
548 .sort_with_limit(sort_expr, fetch)?
549 .build()
550 }
551 LogicalPlanType::Repartition(repartition) => {
552 use datafusion::logical_expr::Partitioning;
553 let input: LogicalPlan =
554 into_logical_plan!(repartition.input, ctx, extension_codec)?;
555 use protobuf::repartition_node::PartitionMethod;
556 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
557 internal_datafusion_err!(
558 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
559 )
560 })?;
561
562 let partitioning_scheme = match pb_partition_method {
563 PartitionMethod::Hash(protobuf::HashRepartition {
564 hash_expr: pb_hash_expr,
565 partition_count,
566 }) => Partitioning::Hash(
567 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
568 *partition_count as usize,
569 ),
570 PartitionMethod::RoundRobin(partition_count) => {
571 Partitioning::RoundRobinBatch(*partition_count as usize)
572 }
573 };
574
575 LogicalPlanBuilder::from(input)
576 .repartition(partitioning_scheme)?
577 .build()
578 }
579 LogicalPlanType::EmptyRelation(empty_relation) => {
580 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
581 }
582 LogicalPlanType::CreateExternalTable(create_extern_table) => {
583 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
584 DataFusionError::Internal(String::from(
585 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
586 ))
587 })?;
588
589 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
590 DataFusionError::Internal(String::from(
591 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
592 ))
593 })?;
594 let definition = if !create_extern_table.definition.is_empty() {
595 Some(create_extern_table.definition.clone())
596 } else {
597 None
598 };
599
600 let file_type = create_extern_table.file_type.as_str();
601 if ctx.table_factory(file_type).is_none() {
602 internal_err!("No TableProviderFactory for file type: {file_type}")?
603 }
604
605 let mut order_exprs = vec![];
606 for expr in &create_extern_table.order_exprs {
607 order_exprs.push(from_proto::parse_sorts(
608 &expr.sort_expr_nodes,
609 ctx,
610 extension_codec,
611 )?);
612 }
613
614 let mut column_defaults =
615 HashMap::with_capacity(create_extern_table.column_defaults.len());
616 for (col_name, expr) in &create_extern_table.column_defaults {
617 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
618 column_defaults.insert(col_name.clone(), expr);
619 }
620
621 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
622 CreateExternalTable {
623 schema: pb_schema.try_into()?,
624 name: from_table_reference(
625 create_extern_table.name.as_ref(),
626 "CreateExternalTable",
627 )?,
628 location: create_extern_table.location.clone(),
629 file_type: create_extern_table.file_type.clone(),
630 table_partition_cols: create_extern_table
631 .table_partition_cols
632 .clone(),
633 order_exprs,
634 if_not_exists: create_extern_table.if_not_exists,
635 temporary: create_extern_table.temporary,
636 definition,
637 unbounded: create_extern_table.unbounded,
638 options: create_extern_table.options.clone(),
639 constraints: constraints.into(),
640 column_defaults,
641 },
642 )))
643 }
644 LogicalPlanType::CreateView(create_view) => {
645 let plan = create_view
646 .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
647 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
648 )))?
649 .try_into_logical_plan(ctx, extension_codec)?;
650 let definition = if !create_view.definition.is_empty() {
651 Some(create_view.definition.clone())
652 } else {
653 None
654 };
655
656 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
657 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
658 temporary: create_view.temporary,
659 input: Arc::new(plan),
660 or_replace: create_view.or_replace,
661 definition,
662 })))
663 }
664 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
665 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
666 DataFusionError::Internal(String::from(
667 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
668 ))
669 })?;
670
671 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
672 CreateCatalogSchema {
673 schema_name: create_catalog_schema.schema_name.clone(),
674 if_not_exists: create_catalog_schema.if_not_exists,
675 schema: pb_schema.try_into()?,
676 },
677 )))
678 }
679 LogicalPlanType::CreateCatalog(create_catalog) => {
680 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
681 DataFusionError::Internal(String::from(
682 "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
683 ))
684 })?;
685
686 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
687 CreateCatalog {
688 catalog_name: create_catalog.catalog_name.clone(),
689 if_not_exists: create_catalog.if_not_exists,
690 schema: pb_schema.try_into()?,
691 },
692 )))
693 }
694 LogicalPlanType::Analyze(analyze) => {
695 let input: LogicalPlan =
696 into_logical_plan!(analyze.input, ctx, extension_codec)?;
697 LogicalPlanBuilder::from(input)
698 .explain(analyze.verbose, true)?
699 .build()
700 }
701 LogicalPlanType::Explain(explain) => {
702 let input: LogicalPlan =
703 into_logical_plan!(explain.input, ctx, extension_codec)?;
704 LogicalPlanBuilder::from(input)
705 .explain(explain.verbose, false)?
706 .build()
707 }
708 LogicalPlanType::SubqueryAlias(aliased_relation) => {
709 let input: LogicalPlan =
710 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
711 let alias = from_table_reference(
712 aliased_relation.alias.as_ref(),
713 "SubqueryAlias",
714 )?;
715 LogicalPlanBuilder::from(input).alias(alias)?.build()
716 }
717 LogicalPlanType::Limit(limit) => {
718 let input: LogicalPlan =
719 into_logical_plan!(limit.input, ctx, extension_codec)?;
720 let skip = limit.skip.max(0) as usize;
721
722 let fetch = if limit.fetch < 0 {
723 None
724 } else {
725 Some(limit.fetch as usize)
726 };
727
728 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
729 }
730 LogicalPlanType::Join(join) => {
731 let left_keys: Vec<Expr> =
732 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
733 let right_keys: Vec<Expr> =
734 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
735 let join_type =
736 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
737 proto_error(format!(
738 "Received a JoinNode message with unknown JoinType {}",
739 join.join_type
740 ))
741 })?;
742 let join_constraint = protobuf::JoinConstraint::try_from(
743 join.join_constraint,
744 )
745 .map_err(|_| {
746 proto_error(format!(
747 "Received a JoinNode message with unknown JoinConstraint {}",
748 join.join_constraint
749 ))
750 })?;
751 let filter: Option<Expr> = join
752 .filter
753 .as_ref()
754 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
755 .map_or(Ok(None), |v| v.map(Some))?;
756
757 let builder = LogicalPlanBuilder::from(into_logical_plan!(
758 join.left,
759 ctx,
760 extension_codec
761 )?);
762 let builder = match join_constraint.into() {
763 JoinConstraint::On => builder.join_with_expr_keys(
764 into_logical_plan!(join.right, ctx, extension_codec)?,
765 join_type.into(),
766 (left_keys, right_keys),
767 filter,
768 )?,
769 JoinConstraint::Using => {
770 let using_keys = left_keys
772 .into_iter()
773 .map(|key| {
774 key.try_as_col().cloned()
775 .ok_or_else(|| internal_datafusion_err!(
776 "Using join keys must be column references, got: {key:?}"
777 ))
778 })
779 .collect::<Result<Vec<_>, _>>()?;
780 builder.join_using(
781 into_logical_plan!(join.right, ctx, extension_codec)?,
782 join_type.into(),
783 using_keys,
784 )?
785 }
786 };
787
788 builder.build()
789 }
790 LogicalPlanType::Union(union) => {
791 if union.inputs.len() < 2 {
792 return Err( DataFusionError::Internal(String::from(
793 "Protobuf deserialization error, Union was require at least two input.",
794 )));
795 }
796 let (first, rest) = union.inputs.split_first().unwrap();
797 let mut builder = LogicalPlanBuilder::from(
798 first.try_into_logical_plan(ctx, extension_codec)?,
799 );
800
801 for i in rest {
802 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
803 builder = builder.union(plan)?;
804 }
805 builder.build()
806 }
807 LogicalPlanType::CrossJoin(crossjoin) => {
808 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
809 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
810
811 LogicalPlanBuilder::from(left).cross_join(right)?.build()
812 }
813 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
814 let input_plans: Vec<LogicalPlan> = inputs
815 .iter()
816 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
817 .collect::<Result<_>>()?;
818
819 let extension_node =
820 extension_codec.try_decode(node, &input_plans, ctx)?;
821 Ok(LogicalPlan::Extension(extension_node))
822 }
823 LogicalPlanType::Distinct(distinct) => {
824 let input: LogicalPlan =
825 into_logical_plan!(distinct.input, ctx, extension_codec)?;
826 LogicalPlanBuilder::from(input).distinct()?.build()
827 }
828 LogicalPlanType::DistinctOn(distinct_on) => {
829 let input: LogicalPlan =
830 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
831 let on_expr =
832 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
833 let select_expr = from_proto::parse_exprs(
834 &distinct_on.select_expr,
835 ctx,
836 extension_codec,
837 )?;
838 let sort_expr = match distinct_on.sort_expr.len() {
839 0 => None,
840 _ => Some(from_proto::parse_sorts(
841 &distinct_on.sort_expr,
842 ctx,
843 extension_codec,
844 )?),
845 };
846 LogicalPlanBuilder::from(input)
847 .distinct_on(on_expr, select_expr, sort_expr)?
848 .build()
849 }
850 LogicalPlanType::ViewScan(scan) => {
851 let schema: Schema = convert_required!(scan.schema)?;
852
853 let mut projection = None;
854 if let Some(columns) = &scan.projection {
855 let column_indices = columns
856 .columns
857 .iter()
858 .map(|name| schema.index_of(name))
859 .collect::<Result<Vec<usize>, _>>()?;
860 projection = Some(column_indices);
861 }
862
863 let input: LogicalPlan =
864 into_logical_plan!(scan.input, ctx, extension_codec)?;
865
866 let definition = if !scan.definition.is_empty() {
867 Some(scan.definition.clone())
868 } else {
869 None
870 };
871
872 let provider = ViewTable::new(input, definition);
873
874 let table_name =
875 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
876
877 LogicalPlanBuilder::scan(
878 table_name,
879 provider_as_source(Arc::new(provider)),
880 projection,
881 )?
882 .build()
883 }
884 LogicalPlanType::Prepare(prepare) => {
885 let input: LogicalPlan =
886 into_logical_plan!(prepare.input, ctx, extension_codec)?;
887 let data_types: Vec<DataType> = prepare
888 .data_types
889 .iter()
890 .map(DataType::try_from)
891 .collect::<Result<_, _>>()?;
892 LogicalPlanBuilder::from(input)
893 .prepare(prepare.name.clone(), data_types)?
894 .build()
895 }
896 LogicalPlanType::DropView(dropview) => {
897 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
898 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
899 if_exists: dropview.if_exists,
900 schema: Arc::new(convert_required!(dropview.schema)?),
901 })))
902 }
903 LogicalPlanType::CopyTo(copy) => {
904 let input: LogicalPlan =
905 into_logical_plan!(copy.input, ctx, extension_codec)?;
906
907 let file_type: Arc<dyn FileType> = format_as_file_type(
908 extension_codec.try_decode_file_format(©.file_type, ctx)?,
909 );
910
911 Ok(LogicalPlan::Copy(dml::CopyTo::new(
912 Arc::new(input),
913 copy.output_url.clone(),
914 copy.partition_by.clone(),
915 file_type,
916 Default::default(),
917 )))
918 }
919 LogicalPlanType::Unnest(unnest) => {
920 let input: LogicalPlan =
921 into_logical_plan!(unnest.input, ctx, extension_codec)?;
922
923 LogicalPlanBuilder::from(input)
924 .unnest_columns_with_options(
925 unnest.exec_columns.iter().map(|c| c.into()).collect(),
926 into_required!(unnest.options)?,
927 )?
928 .build()
929 }
930 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
931 let static_term = recursive_query_node
932 .static_term
933 .as_ref()
934 .ok_or_else(|| DataFusionError::Internal(String::from(
935 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
936 )))?
937 .try_into_logical_plan(ctx, extension_codec)?;
938
939 let recursive_term = recursive_query_node
940 .recursive_term
941 .as_ref()
942 .ok_or_else(|| DataFusionError::Internal(String::from(
943 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
944 )))?
945 .try_into_logical_plan(ctx, extension_codec)?;
946
947 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
948 name: recursive_query_node.name.clone(),
949 static_term: Arc::new(static_term),
950 recursive_term: Arc::new(recursive_term),
951 is_distinct: recursive_query_node.is_distinct,
952 }))
953 }
954 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
955 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
956 let schema = convert_required!(*schema)?;
957 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
958 LogicalPlanBuilder::scan(
959 name.as_str(),
960 provider_as_source(Arc::new(cte_work_table)),
961 None,
962 )?
963 .build()
964 }
965 LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
966 datafusion::logical_expr::DmlStatement::new(
967 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
968 to_table_source(&dml_node.target, ctx, extension_codec)?,
969 dml_node.dml_type().into(),
970 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
971 ),
972 )),
973 }
974 }
975
976 fn try_from_logical_plan(
977 plan: &LogicalPlan,
978 extension_codec: &dyn LogicalExtensionCodec,
979 ) -> Result<Self>
980 where
981 Self: Sized,
982 {
983 match plan {
984 LogicalPlan::Values(Values { values, .. }) => {
985 let n_cols = if values.is_empty() {
986 0
987 } else {
988 values[0].len()
989 } as u64;
990 let values_list =
991 serialize_exprs(values.iter().flatten(), extension_codec)?;
992 Ok(LogicalPlanNode {
993 logical_plan_type: Some(LogicalPlanType::Values(
994 protobuf::ValuesNode {
995 n_cols,
996 values_list,
997 },
998 )),
999 })
1000 }
1001 LogicalPlan::TableScan(TableScan {
1002 table_name,
1003 source,
1004 filters,
1005 projection,
1006 ..
1007 }) => {
1008 let provider = source_as_provider(source)?;
1009 let schema = provider.schema();
1010 let source = provider.as_any();
1011
1012 let projection = match projection {
1013 None => None,
1014 Some(columns) => {
1015 let column_names = columns
1016 .iter()
1017 .map(|i| schema.field(*i).name().to_owned())
1018 .collect();
1019 Some(protobuf::ProjectionColumns {
1020 columns: column_names,
1021 })
1022 }
1023 };
1024
1025 let filters: Vec<protobuf::LogicalExprNode> =
1026 serialize_exprs(filters, extension_codec)?;
1027
1028 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1029 let any = listing_table.options().format.as_any();
1030 let file_format_type = {
1031 let mut maybe_some_type = None;
1032
1033 #[cfg(feature = "parquet")]
1034 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1035 let options = parquet.options();
1036 maybe_some_type =
1037 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1038 options: Some(options.try_into()?),
1039 }));
1040 };
1041
1042 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1043 let options = csv.options();
1044 maybe_some_type =
1045 Some(FileFormatType::Csv(protobuf::CsvFormat {
1046 options: Some(options.try_into()?),
1047 }));
1048 }
1049
1050 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1051 let options = json.options();
1052 maybe_some_type =
1053 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1054 options: Some(options.try_into()?),
1055 }))
1056 }
1057
1058 #[cfg(feature = "avro")]
1059 if any.is::<AvroFormat>() {
1060 maybe_some_type =
1061 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1062 }
1063
1064 if any.is::<ArrowFormat>() {
1065 maybe_some_type =
1066 Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1067 }
1068
1069 if let Some(file_format_type) = maybe_some_type {
1070 file_format_type
1071 } else {
1072 return Err(proto_error(format!(
1073 "Error deserializing unknown file format: {:?}",
1074 listing_table.options().format
1075 )));
1076 }
1077 };
1078
1079 let options = listing_table.options();
1080
1081 let mut builder = SchemaBuilder::from(schema.as_ref());
1082 for (idx, field) in schema.fields().iter().enumerate().rev() {
1083 if options
1084 .table_partition_cols
1085 .iter()
1086 .any(|(name, _)| name == field.name())
1087 {
1088 builder.remove(idx);
1089 }
1090 }
1091
1092 let schema = builder.finish();
1093
1094 let schema: protobuf::Schema = (&schema).try_into()?;
1095
1096 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1097 for order in &options.file_sort_order {
1098 let expr_vec = SortExprNodeCollection {
1099 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1100 };
1101 exprs_vec.push(expr_vec);
1102 }
1103
1104 let partition_columns = options
1105 .table_partition_cols
1106 .iter()
1107 .map(|(name, arrow_type)| {
1108 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1109 .map_err(|e| {
1110 proto_error(format!(
1111 "Received an unknown ArrowType: {e}"
1112 ))
1113 })?;
1114 Ok(protobuf::PartitionColumn {
1115 name: name.clone(),
1116 arrow_type: Some(arrow_type),
1117 })
1118 })
1119 .collect::<Result<Vec<_>>>()?;
1120
1121 Ok(LogicalPlanNode {
1122 logical_plan_type: Some(LogicalPlanType::ListingScan(
1123 protobuf::ListingTableScanNode {
1124 file_format_type: Some(file_format_type),
1125 table_name: Some(table_name.clone().into()),
1126 collect_stat: options.collect_stat,
1127 file_extension: options.file_extension.clone(),
1128 table_partition_cols: partition_columns,
1129 paths: listing_table
1130 .table_paths()
1131 .iter()
1132 .map(|x| x.to_string())
1133 .collect(),
1134 schema: Some(schema),
1135 projection,
1136 filters,
1137 target_partitions: options.target_partitions as u32,
1138 file_sort_order: exprs_vec,
1139 },
1140 )),
1141 })
1142 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1143 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1144 Ok(LogicalPlanNode {
1145 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1146 protobuf::ViewTableScanNode {
1147 table_name: Some(table_name.clone().into()),
1148 input: Some(Box::new(
1149 LogicalPlanNode::try_from_logical_plan(
1150 view_table.logical_plan(),
1151 extension_codec,
1152 )?,
1153 )),
1154 schema: Some(schema),
1155 projection,
1156 definition: view_table
1157 .definition()
1158 .map(|s| s.to_string())
1159 .unwrap_or_default(),
1160 },
1161 ))),
1162 })
1163 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1164 {
1165 let name = cte_work_table.name().to_string();
1166 let schema = cte_work_table.schema();
1167 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1168
1169 Ok(LogicalPlanNode {
1170 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1171 protobuf::CteWorkTableScanNode {
1172 name,
1173 schema: Some(schema),
1174 },
1175 )),
1176 })
1177 } else {
1178 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1179 let mut bytes = vec![];
1180 extension_codec
1181 .try_encode_table_provider(table_name, provider, &mut bytes)
1182 .map_err(|e| context!("Error serializing custom table", e))?;
1183 let scan = CustomScan(CustomTableScanNode {
1184 table_name: Some(table_name.clone().into()),
1185 projection,
1186 schema: Some(schema),
1187 filters,
1188 custom_table_data: bytes,
1189 });
1190 let node = LogicalPlanNode {
1191 logical_plan_type: Some(scan),
1192 };
1193 Ok(node)
1194 }
1195 }
1196 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1197 Ok(LogicalPlanNode {
1198 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1199 protobuf::ProjectionNode {
1200 input: Some(Box::new(
1201 LogicalPlanNode::try_from_logical_plan(
1202 input.as_ref(),
1203 extension_codec,
1204 )?,
1205 )),
1206 expr: serialize_exprs(expr, extension_codec)?,
1207 optional_alias: None,
1208 },
1209 ))),
1210 })
1211 }
1212 LogicalPlan::Filter(filter) => {
1213 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1214 filter.input.as_ref(),
1215 extension_codec,
1216 )?;
1217 Ok(LogicalPlanNode {
1218 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1219 protobuf::SelectionNode {
1220 input: Some(Box::new(input)),
1221 expr: Some(serialize_expr(
1222 &filter.predicate,
1223 extension_codec,
1224 )?),
1225 },
1226 ))),
1227 })
1228 }
1229 LogicalPlan::Distinct(Distinct::All(input)) => {
1230 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1231 input.as_ref(),
1232 extension_codec,
1233 )?;
1234 Ok(LogicalPlanNode {
1235 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1236 protobuf::DistinctNode {
1237 input: Some(Box::new(input)),
1238 },
1239 ))),
1240 })
1241 }
1242 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1243 on_expr,
1244 select_expr,
1245 sort_expr,
1246 input,
1247 ..
1248 })) => {
1249 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1250 input.as_ref(),
1251 extension_codec,
1252 )?;
1253 let sort_expr = match sort_expr {
1254 None => vec![],
1255 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1256 };
1257 Ok(LogicalPlanNode {
1258 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1259 protobuf::DistinctOnNode {
1260 on_expr: serialize_exprs(on_expr, extension_codec)?,
1261 select_expr: serialize_exprs(select_expr, extension_codec)?,
1262 sort_expr,
1263 input: Some(Box::new(input)),
1264 },
1265 ))),
1266 })
1267 }
1268 LogicalPlan::Window(Window {
1269 input, window_expr, ..
1270 }) => {
1271 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1272 input.as_ref(),
1273 extension_codec,
1274 )?;
1275 Ok(LogicalPlanNode {
1276 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1277 protobuf::WindowNode {
1278 input: Some(Box::new(input)),
1279 window_expr: serialize_exprs(window_expr, extension_codec)?,
1280 },
1281 ))),
1282 })
1283 }
1284 LogicalPlan::Aggregate(Aggregate {
1285 group_expr,
1286 aggr_expr,
1287 input,
1288 ..
1289 }) => {
1290 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1291 input.as_ref(),
1292 extension_codec,
1293 )?;
1294 Ok(LogicalPlanNode {
1295 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1296 protobuf::AggregateNode {
1297 input: Some(Box::new(input)),
1298 group_expr: serialize_exprs(group_expr, extension_codec)?,
1299 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1300 },
1301 ))),
1302 })
1303 }
1304 LogicalPlan::Join(Join {
1305 left,
1306 right,
1307 on,
1308 filter,
1309 join_type,
1310 join_constraint,
1311 null_equality,
1312 ..
1313 }) => {
1314 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1315 left.as_ref(),
1316 extension_codec,
1317 )?;
1318 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1319 right.as_ref(),
1320 extension_codec,
1321 )?;
1322 let (left_join_key, right_join_key) = on
1323 .iter()
1324 .map(|(l, r)| {
1325 Ok((
1326 serialize_expr(l, extension_codec)?,
1327 serialize_expr(r, extension_codec)?,
1328 ))
1329 })
1330 .collect::<Result<Vec<_>, ToProtoError>>()?
1331 .into_iter()
1332 .unzip();
1333 let join_type: protobuf::JoinType = join_type.to_owned().into();
1334 let join_constraint: protobuf::JoinConstraint =
1335 join_constraint.to_owned().into();
1336 let null_equality: protobuf::NullEquality =
1337 null_equality.to_owned().into();
1338 let filter = filter
1339 .as_ref()
1340 .map(|e| serialize_expr(e, extension_codec))
1341 .map_or(Ok(None), |v| v.map(Some))?;
1342 Ok(LogicalPlanNode {
1343 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1344 protobuf::JoinNode {
1345 left: Some(Box::new(left)),
1346 right: Some(Box::new(right)),
1347 join_type: join_type.into(),
1348 join_constraint: join_constraint.into(),
1349 left_join_key,
1350 right_join_key,
1351 null_equality: null_equality.into(),
1352 filter,
1353 },
1354 ))),
1355 })
1356 }
1357 LogicalPlan::Subquery(_) => {
1358 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1359 }
1360 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1361 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1362 input.as_ref(),
1363 extension_codec,
1364 )?;
1365 Ok(LogicalPlanNode {
1366 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1367 protobuf::SubqueryAliasNode {
1368 input: Some(Box::new(input)),
1369 alias: Some((*alias).clone().into()),
1370 },
1371 ))),
1372 })
1373 }
1374 LogicalPlan::Limit(limit) => {
1375 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1376 limit.input.as_ref(),
1377 extension_codec,
1378 )?;
1379 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1380 return Err(proto_error(
1381 "LogicalPlan::Limit only supports literal skip values",
1382 ));
1383 };
1384 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1385 return Err(proto_error(
1386 "LogicalPlan::Limit only supports literal fetch values",
1387 ));
1388 };
1389
1390 Ok(LogicalPlanNode {
1391 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1392 protobuf::LimitNode {
1393 input: Some(Box::new(input)),
1394 skip: skip as i64,
1395 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1396 },
1397 ))),
1398 })
1399 }
1400 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1401 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1402 input.as_ref(),
1403 extension_codec,
1404 )?;
1405 let sort_expr: Vec<protobuf::SortExprNode> =
1406 serialize_sorts(expr, extension_codec)?;
1407 Ok(LogicalPlanNode {
1408 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1409 protobuf::SortNode {
1410 input: Some(Box::new(input)),
1411 expr: sort_expr,
1412 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1413 },
1414 ))),
1415 })
1416 }
1417 LogicalPlan::Repartition(Repartition {
1418 input,
1419 partitioning_scheme,
1420 }) => {
1421 use datafusion::logical_expr::Partitioning;
1422 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1423 input.as_ref(),
1424 extension_codec,
1425 )?;
1426
1427 use protobuf::repartition_node::PartitionMethod;
1430
1431 let pb_partition_method = match partitioning_scheme {
1432 Partitioning::Hash(exprs, partition_count) => {
1433 PartitionMethod::Hash(protobuf::HashRepartition {
1434 hash_expr: serialize_exprs(exprs, extension_codec)?,
1435 partition_count: *partition_count as u64,
1436 })
1437 }
1438 Partitioning::RoundRobinBatch(partition_count) => {
1439 PartitionMethod::RoundRobin(*partition_count as u64)
1440 }
1441 Partitioning::DistributeBy(_) => {
1442 return not_impl_err!("DistributeBy")
1443 }
1444 };
1445
1446 Ok(LogicalPlanNode {
1447 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1448 protobuf::RepartitionNode {
1449 input: Some(Box::new(input)),
1450 partition_method: Some(pb_partition_method),
1451 },
1452 ))),
1453 })
1454 }
1455 LogicalPlan::EmptyRelation(EmptyRelation {
1456 produce_one_row, ..
1457 }) => Ok(LogicalPlanNode {
1458 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1459 protobuf::EmptyRelationNode {
1460 produce_one_row: *produce_one_row,
1461 },
1462 )),
1463 }),
1464 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1465 CreateExternalTable {
1466 name,
1467 location,
1468 file_type,
1469 schema: df_schema,
1470 table_partition_cols,
1471 if_not_exists,
1472 definition,
1473 order_exprs,
1474 unbounded,
1475 options,
1476 constraints,
1477 column_defaults,
1478 temporary,
1479 },
1480 )) => {
1481 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1482 for order in order_exprs {
1483 let temp = SortExprNodeCollection {
1484 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1485 };
1486 converted_order_exprs.push(temp);
1487 }
1488
1489 let mut converted_column_defaults =
1490 HashMap::with_capacity(column_defaults.len());
1491 for (col_name, expr) in column_defaults {
1492 converted_column_defaults
1493 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1494 }
1495
1496 Ok(LogicalPlanNode {
1497 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1498 protobuf::CreateExternalTableNode {
1499 name: Some(name.clone().into()),
1500 location: location.clone(),
1501 file_type: file_type.clone(),
1502 schema: Some(df_schema.try_into()?),
1503 table_partition_cols: table_partition_cols.clone(),
1504 if_not_exists: *if_not_exists,
1505 temporary: *temporary,
1506 order_exprs: converted_order_exprs,
1507 definition: definition.clone().unwrap_or_default(),
1508 unbounded: *unbounded,
1509 options: options.clone(),
1510 constraints: Some(constraints.clone().into()),
1511 column_defaults: converted_column_defaults,
1512 },
1513 )),
1514 })
1515 }
1516 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1517 name,
1518 input,
1519 or_replace,
1520 definition,
1521 temporary,
1522 })) => Ok(LogicalPlanNode {
1523 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1524 protobuf::CreateViewNode {
1525 name: Some(name.clone().into()),
1526 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1527 input,
1528 extension_codec,
1529 )?)),
1530 or_replace: *or_replace,
1531 temporary: *temporary,
1532 definition: definition.clone().unwrap_or_default(),
1533 },
1534 ))),
1535 }),
1536 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1537 CreateCatalogSchema {
1538 schema_name,
1539 if_not_exists,
1540 schema: df_schema,
1541 },
1542 )) => Ok(LogicalPlanNode {
1543 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1544 protobuf::CreateCatalogSchemaNode {
1545 schema_name: schema_name.clone(),
1546 if_not_exists: *if_not_exists,
1547 schema: Some(df_schema.try_into()?),
1548 },
1549 )),
1550 }),
1551 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1552 catalog_name,
1553 if_not_exists,
1554 schema: df_schema,
1555 })) => Ok(LogicalPlanNode {
1556 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1557 protobuf::CreateCatalogNode {
1558 catalog_name: catalog_name.clone(),
1559 if_not_exists: *if_not_exists,
1560 schema: Some(df_schema.try_into()?),
1561 },
1562 )),
1563 }),
1564 LogicalPlan::Analyze(a) => {
1565 let input = LogicalPlanNode::try_from_logical_plan(
1566 a.input.as_ref(),
1567 extension_codec,
1568 )?;
1569 Ok(LogicalPlanNode {
1570 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1571 protobuf::AnalyzeNode {
1572 input: Some(Box::new(input)),
1573 verbose: a.verbose,
1574 },
1575 ))),
1576 })
1577 }
1578 LogicalPlan::Explain(a) => {
1579 let input = LogicalPlanNode::try_from_logical_plan(
1580 a.plan.as_ref(),
1581 extension_codec,
1582 )?;
1583 Ok(LogicalPlanNode {
1584 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1585 protobuf::ExplainNode {
1586 input: Some(Box::new(input)),
1587 verbose: a.verbose,
1588 },
1589 ))),
1590 })
1591 }
1592 LogicalPlan::Union(union) => {
1593 let inputs: Vec<LogicalPlanNode> = union
1594 .inputs
1595 .iter()
1596 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1597 .collect::<Result<_>>()?;
1598 Ok(LogicalPlanNode {
1599 logical_plan_type: Some(LogicalPlanType::Union(
1600 protobuf::UnionNode { inputs },
1601 )),
1602 })
1603 }
1604 LogicalPlan::Extension(extension) => {
1605 let mut buf: Vec<u8> = vec![];
1606 extension_codec.try_encode(extension, &mut buf)?;
1607
1608 let inputs: Vec<LogicalPlanNode> = extension
1609 .node
1610 .inputs()
1611 .iter()
1612 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1613 .collect::<Result<_>>()?;
1614
1615 Ok(LogicalPlanNode {
1616 logical_plan_type: Some(LogicalPlanType::Extension(
1617 LogicalExtensionNode { node: buf, inputs },
1618 )),
1619 })
1620 }
1621 LogicalPlan::Statement(Statement::Prepare(Prepare {
1622 name,
1623 data_types,
1624 input,
1625 })) => {
1626 let input =
1627 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1628 Ok(LogicalPlanNode {
1629 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1630 protobuf::PrepareNode {
1631 name: name.clone(),
1632 data_types: data_types
1633 .iter()
1634 .map(|t| t.try_into())
1635 .collect::<Result<Vec<_>, _>>()?,
1636 input: Some(Box::new(input)),
1637 },
1638 ))),
1639 })
1640 }
1641 LogicalPlan::Unnest(Unnest {
1642 input,
1643 exec_columns,
1644 list_type_columns,
1645 struct_type_columns,
1646 dependency_indices,
1647 schema,
1648 options,
1649 }) => {
1650 let input =
1651 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1652 let proto_unnest_list_items = list_type_columns
1653 .iter()
1654 .map(|(index, ul)| ColumnUnnestListItem {
1655 input_index: *index as _,
1656 recursion: Some(ColumnUnnestListRecursion {
1657 output_column: Some(ul.output_column.to_owned().into()),
1658 depth: ul.depth as _,
1659 }),
1660 })
1661 .collect();
1662 Ok(LogicalPlanNode {
1663 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1664 protobuf::UnnestNode {
1665 input: Some(Box::new(input)),
1666 exec_columns: exec_columns
1667 .iter()
1668 .map(|col| col.into())
1669 .collect(),
1670 list_type_columns: proto_unnest_list_items,
1671 struct_type_columns: struct_type_columns
1672 .iter()
1673 .map(|c| *c as u64)
1674 .collect(),
1675 dependency_indices: dependency_indices
1676 .iter()
1677 .map(|c| *c as u64)
1678 .collect(),
1679 schema: Some(schema.try_into()?),
1680 options: Some(options.into()),
1681 },
1682 ))),
1683 })
1684 }
1685 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1686 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1687 )),
1688 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1689 "LogicalPlan serde is not yet implemented for CreateIndex",
1690 )),
1691 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1692 "LogicalPlan serde is not yet implemented for DropTable",
1693 )),
1694 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1695 name,
1696 if_exists,
1697 schema,
1698 })) => Ok(LogicalPlanNode {
1699 logical_plan_type: Some(LogicalPlanType::DropView(
1700 protobuf::DropViewNode {
1701 name: Some(name.clone().into()),
1702 if_exists: *if_exists,
1703 schema: Some(schema.try_into()?),
1704 },
1705 )),
1706 }),
1707 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1708 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1709 )),
1710 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1711 "LogicalPlan serde is not yet implemented for CreateFunction",
1712 )),
1713 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1714 "LogicalPlan serde is not yet implemented for DropFunction",
1715 )),
1716 LogicalPlan::Statement(_) => Err(proto_error(
1717 "LogicalPlan serde is not yet implemented for Statement",
1718 )),
1719 LogicalPlan::Dml(DmlStatement {
1720 table_name,
1721 target,
1722 op,
1723 input,
1724 ..
1725 }) => {
1726 let input =
1727 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1728 let dml_type: dml_node::Type = op.into();
1729 Ok(LogicalPlanNode {
1730 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1731 input: Some(Box::new(input)),
1732 target: Some(Box::new(from_table_source(
1733 table_name.clone(),
1734 Arc::clone(target),
1735 extension_codec,
1736 )?)),
1737 table_name: Some(table_name.clone().into()),
1738 dml_type: dml_type.into(),
1739 }))),
1740 })
1741 }
1742 LogicalPlan::Copy(dml::CopyTo {
1743 input,
1744 output_url,
1745 file_type,
1746 partition_by,
1747 ..
1748 }) => {
1749 let input =
1750 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1751 let mut buf = Vec::new();
1752 extension_codec
1753 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1754
1755 Ok(LogicalPlanNode {
1756 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1757 protobuf::CopyToNode {
1758 input: Some(Box::new(input)),
1759 output_url: output_url.to_string(),
1760 file_type: buf,
1761 partition_by: partition_by.clone(),
1762 },
1763 ))),
1764 })
1765 }
1766 LogicalPlan::DescribeTable(_) => Err(proto_error(
1767 "LogicalPlan serde is not yet implemented for DescribeTable",
1768 )),
1769 LogicalPlan::RecursiveQuery(recursive) => {
1770 let static_term = LogicalPlanNode::try_from_logical_plan(
1771 recursive.static_term.as_ref(),
1772 extension_codec,
1773 )?;
1774 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1775 recursive.recursive_term.as_ref(),
1776 extension_codec,
1777 )?;
1778
1779 Ok(LogicalPlanNode {
1780 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1781 protobuf::RecursiveQueryNode {
1782 name: recursive.name.clone(),
1783 static_term: Some(Box::new(static_term)),
1784 recursive_term: Some(Box::new(recursive_term)),
1785 is_distinct: recursive.is_distinct,
1786 },
1787 ))),
1788 })
1789 }
1790 }
1791 }
1792}