1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection, dml_node,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, LogicalExtensionNode, LogicalPlanNode,
31 listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType,
32 },
33};
34
35use crate::protobuf::{ToProtoError, proto_error};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_common::file_options::file_type::FileType;
39use datafusion_common::{
40 Result, TableReference, ToDFSchema, assert_or_internal_err, context,
41 internal_datafusion_err, internal_err, not_impl_err, plan_err,
42};
43use datafusion_datasource::file_format::FileFormat;
44use datafusion_datasource::file_format::{
45 FileFormatFactory, file_type_to_format, format_as_file_type,
46};
47use datafusion_datasource_arrow::file_format::ArrowFormat;
48#[cfg(feature = "avro")]
49use datafusion_datasource_avro::file_format::AvroFormat;
50use datafusion_datasource_csv::file_format::CsvFormat;
51use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
52#[cfg(feature = "parquet")]
53use datafusion_datasource_parquet::file_format::ParquetFormat;
54use datafusion_expr::{
55 AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
56};
57use datafusion_expr::{
58 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
59 Statement, WindowUDF, dml,
60 logical_plan::{
61 Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
62 DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare,
63 Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
64 builder::project,
65 },
66};
67
68use self::to_proto::{serialize_expr, serialize_exprs};
69use crate::logical_plan::to_proto::serialize_sorts;
70use datafusion_catalog::TableProvider;
71use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
72use datafusion_catalog::view::ViewTable;
73use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
74use datafusion_datasource::ListingTableUrl;
75use datafusion_execution::TaskContext;
76use prost::Message;
77use prost::bytes::BufMut;
78
79pub mod file_formats;
80pub mod from_proto;
81pub mod to_proto;
82
83pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
84 fn try_decode(buf: &[u8]) -> Result<Self>
85 where
86 Self: Sized;
87
88 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
89 where
90 B: BufMut,
91 Self: Sized;
92
93 fn try_into_logical_plan(
94 &self,
95 ctx: &TaskContext,
96 extension_codec: &dyn LogicalExtensionCodec,
97 ) -> Result<LogicalPlan>;
98
99 fn try_from_logical_plan(
100 plan: &LogicalPlan,
101 extension_codec: &dyn LogicalExtensionCodec,
102 ) -> Result<Self>
103 where
104 Self: Sized;
105}
106
107pub trait LogicalExtensionCodec: Debug + Send + Sync {
108 fn try_decode(
109 &self,
110 buf: &[u8],
111 inputs: &[LogicalPlan],
112 ctx: &TaskContext,
113 ) -> Result<Extension>;
114
115 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
116
117 fn try_decode_table_provider(
118 &self,
119 buf: &[u8],
120 table_ref: &TableReference,
121 schema: SchemaRef,
122 ctx: &TaskContext,
123 ) -> Result<Arc<dyn TableProvider>>;
124
125 fn try_encode_table_provider(
126 &self,
127 table_ref: &TableReference,
128 node: Arc<dyn TableProvider>,
129 buf: &mut Vec<u8>,
130 ) -> Result<()>;
131
132 fn try_decode_file_format(
133 &self,
134 _buf: &[u8],
135 _ctx: &TaskContext,
136 ) -> Result<Arc<dyn FileFormatFactory>> {
137 not_impl_err!("LogicalExtensionCodec is not provided for file format")
138 }
139
140 fn try_encode_file_format(
141 &self,
142 _buf: &mut Vec<u8>,
143 _node: Arc<dyn FileFormatFactory>,
144 ) -> Result<()> {
145 Ok(())
146 }
147
148 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
149 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
150 }
151
152 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
153 Ok(())
154 }
155
156 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
157 not_impl_err!(
158 "LogicalExtensionCodec is not provided for aggregate function {name}"
159 )
160 }
161
162 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
163 Ok(())
164 }
165
166 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
167 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
168 }
169
170 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
171 Ok(())
172 }
173}
174
175#[derive(Debug, Clone)]
176pub struct DefaultLogicalExtensionCodec {}
177
178impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
179 fn try_decode(
180 &self,
181 _buf: &[u8],
182 _inputs: &[LogicalPlan],
183 _ctx: &TaskContext,
184 ) -> Result<Extension> {
185 not_impl_err!("LogicalExtensionCodec is not provided")
186 }
187
188 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
189 not_impl_err!("LogicalExtensionCodec is not provided")
190 }
191
192 fn try_decode_table_provider(
193 &self,
194 _buf: &[u8],
195 _table_ref: &TableReference,
196 _schema: SchemaRef,
197 _ctx: &TaskContext,
198 ) -> Result<Arc<dyn TableProvider>> {
199 not_impl_err!("LogicalExtensionCodec is not provided")
200 }
201
202 fn try_encode_table_provider(
203 &self,
204 _table_ref: &TableReference,
205 _node: Arc<dyn TableProvider>,
206 _buf: &mut Vec<u8>,
207 ) -> Result<()> {
208 not_impl_err!("LogicalExtensionCodec is not provided")
209 }
210}
211
212#[macro_export]
213macro_rules! into_logical_plan {
214 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
215 if let Some(field) = $PB.as_ref() {
216 field.as_ref().try_into_logical_plan($CTX, $CODEC)
217 } else {
218 Err(proto_error("Missing required field in protobuf"))
219 }
220 }};
221}
222
223fn from_table_reference(
224 table_ref: Option<&protobuf::TableReference>,
225 error_context: &str,
226) -> Result<TableReference> {
227 let table_ref = table_ref.ok_or_else(|| {
228 internal_datafusion_err!(
229 "Protobuf deserialization error, {error_context} was missing required field name."
230 )
231 })?;
232
233 Ok(table_ref.clone().try_into()?)
234}
235
236fn to_table_source(
240 node: &Option<Box<LogicalPlanNode>>,
241 ctx: &TaskContext,
242 extension_codec: &dyn LogicalExtensionCodec,
243) -> Result<Arc<dyn TableSource>> {
244 if let Some(node) = node {
245 match node.try_into_logical_plan(ctx, extension_codec)? {
246 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
247 _ => plan_err!("expected TableScan node"),
248 }
249 } else {
250 plan_err!("LogicalPlanNode should be provided")
251 }
252}
253
254fn from_table_source(
258 table_name: TableReference,
259 target: Arc<dyn TableSource>,
260 extension_codec: &dyn LogicalExtensionCodec,
261) -> Result<LogicalPlanNode> {
262 let projected_schema = target.schema().to_dfschema_ref()?;
263 let r = LogicalPlan::TableScan(TableScan {
264 table_name,
265 source: target,
266 projection: None,
267 projected_schema,
268 filters: vec![],
269 fetch: None,
270 });
271
272 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
273}
274
275impl AsLogicalPlan for LogicalPlanNode {
276 fn try_decode(buf: &[u8]) -> Result<Self>
277 where
278 Self: Sized,
279 {
280 LogicalPlanNode::decode(buf)
281 .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
282 }
283
284 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
285 where
286 B: BufMut,
287 Self: Sized,
288 {
289 self.encode(buf)
290 .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
291 }
292
293 fn try_into_logical_plan(
294 &self,
295 ctx: &TaskContext,
296 extension_codec: &dyn LogicalExtensionCodec,
297 ) -> Result<LogicalPlan> {
298 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
299 proto_error(format!(
300 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
301 ))
302 })?;
303 match plan {
304 LogicalPlanType::Values(values) => {
305 let n_cols = values.n_cols as usize;
306 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
307 Ok(Vec::new())
308 } else if values.values_list.len() % n_cols != 0 {
309 internal_err!(
310 "Invalid values list length, expect {} to be divisible by {}",
311 values.values_list.len(),
312 n_cols
313 )
314 } else {
315 values
316 .values_list
317 .chunks_exact(n_cols)
318 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
319 .collect::<Result<Vec<_>, _>>()
320 .map_err(|e| e.into())
321 }?;
322
323 LogicalPlanBuilder::values(values)?.build()
324 }
325 LogicalPlanType::Projection(projection) => {
326 let input: LogicalPlan =
327 into_logical_plan!(projection.input, ctx, extension_codec)?;
328 let expr: Vec<Expr> =
329 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
330
331 let new_proj = project(input, expr)?;
332 match projection.optional_alias.as_ref() {
333 Some(a) => match a {
334 protobuf::projection_node::OptionalAlias::Alias(alias) => {
335 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
336 Arc::new(new_proj),
337 alias.clone(),
338 )?))
339 }
340 },
341 _ => Ok(new_proj),
342 }
343 }
344 LogicalPlanType::Selection(selection) => {
345 let input: LogicalPlan =
346 into_logical_plan!(selection.input, ctx, extension_codec)?;
347 let expr: Expr = selection
348 .expr
349 .as_ref()
350 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
351 .transpose()?
352 .ok_or_else(|| proto_error("expression required"))?;
353 LogicalPlanBuilder::from(input).filter(expr)?.build()
354 }
355 LogicalPlanType::Window(window) => {
356 let input: LogicalPlan =
357 into_logical_plan!(window.input, ctx, extension_codec)?;
358 let window_expr =
359 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
360 LogicalPlanBuilder::from(input).window(window_expr)?.build()
361 }
362 LogicalPlanType::Aggregate(aggregate) => {
363 let input: LogicalPlan =
364 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
365 let group_expr =
366 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
367 let aggr_expr =
368 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
369 LogicalPlanBuilder::from(input)
370 .aggregate(group_expr, aggr_expr)?
371 .build()
372 }
373 LogicalPlanType::ListingScan(scan) => {
374 let schema: Schema = convert_required!(scan.schema)?;
375
376 let filters =
377 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
378
379 let mut all_sort_orders = vec![];
380 for order in &scan.file_sort_order {
381 all_sort_orders.push(from_proto::parse_sorts(
382 &order.sort_expr_nodes,
383 ctx,
384 extension_codec,
385 )?)
386 }
387
388 let file_format: Arc<dyn FileFormat> =
389 match scan.file_format_type.as_ref().ok_or_else(|| {
390 proto_error(format!(
391 "logical_plan::from_proto() Unsupported file format '{self:?}'"
392 ))
393 })? {
394 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
395 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
396 #[cfg(feature = "parquet")]
397 {
398 let mut parquet = ParquetFormat::default();
399 if let Some(options) = options {
400 parquet = parquet.with_options(options.try_into()?)
401 }
402 Arc::new(parquet)
403 }
404 #[cfg(not(feature = "parquet"))]
405 panic!("Unable to process parquet file since `parquet` feature is not enabled");
406 }
407 FileFormatType::Csv(protobuf::CsvFormat {
408 options
409 }) => {
410 let mut csv = CsvFormat::default();
411 if let Some(options) = options {
412 csv = csv.with_options(options.try_into()?)
413 }
414 Arc::new(csv)
415 },
416 FileFormatType::Json(protobuf::NdJsonFormat {
417 options
418 }) => {
419 let mut json = OtherNdJsonFormat::default();
420 if let Some(options) = options {
421 json = json.with_options(options.try_into()?)
422 }
423 Arc::new(json)
424 }
425 FileFormatType::Avro(..) => {
426 #[cfg(feature = "avro")]
427 {
428 Arc::new(AvroFormat)
429 }
430 #[cfg(not(feature = "avro"))]
431 {
432 panic!(
433 "Unable to process avro file since `avro` feature is not enabled"
434 );
435 }
436 }
437 FileFormatType::Arrow(..) => {
438 Arc::new(ArrowFormat)
439 }
440 };
441
442 let table_paths = &scan
443 .paths
444 .iter()
445 .map(ListingTableUrl::parse)
446 .collect::<Result<Vec<_>, _>>()?;
447
448 let partition_columns = scan
449 .table_partition_cols
450 .iter()
451 .map(|col| {
452 let Some(arrow_type) = col.arrow_type.as_ref() else {
453 return Err(proto_error(
454 "Missing Arrow type in partition columns",
455 ));
456 };
457 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
458 proto_error(format!("Received an unknown ArrowType: {e}"))
459 })?;
460 Ok((col.name.clone(), arrow_type))
461 })
462 .collect::<Result<Vec<_>>>()?;
463
464 let options = ListingOptions::new(file_format)
465 .with_file_extension(&scan.file_extension)
466 .with_table_partition_cols(partition_columns)
467 .with_collect_stat(scan.collect_stat)
468 .with_target_partitions(scan.target_partitions as usize)
469 .with_file_sort_order(all_sort_orders);
470
471 let config =
472 ListingTableConfig::new_with_multi_paths(table_paths.clone())
473 .with_listing_options(options)
474 .with_schema(Arc::new(schema));
475
476 let provider = ListingTable::try_new(config)?.with_cache(
477 ctx.runtime_env().cache_manager.get_file_statistic_cache(),
478 );
479
480 let table_name =
481 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
482
483 let mut projection = None;
484 if let Some(columns) = &scan.projection {
485 let column_indices = columns
486 .columns
487 .iter()
488 .map(|name| provider.schema().index_of(name))
489 .collect::<Result<Vec<usize>, _>>()?;
490 projection = Some(column_indices);
491 }
492
493 LogicalPlanBuilder::scan_with_filters(
494 table_name,
495 provider_as_source(Arc::new(provider)),
496 projection,
497 filters,
498 )?
499 .build()
500 }
501 LogicalPlanType::CustomScan(scan) => {
502 let schema: Schema = convert_required!(scan.schema)?;
503 let schema = Arc::new(schema);
504 let mut projection = None;
505 if let Some(columns) = &scan.projection {
506 let column_indices = columns
507 .columns
508 .iter()
509 .map(|name| schema.index_of(name))
510 .collect::<Result<Vec<usize>, _>>()?;
511 projection = Some(column_indices);
512 }
513
514 let filters =
515 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
516
517 let table_name =
518 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
519
520 let provider = extension_codec.try_decode_table_provider(
521 &scan.custom_table_data,
522 &table_name,
523 schema,
524 ctx,
525 )?;
526
527 LogicalPlanBuilder::scan_with_filters(
528 table_name,
529 provider_as_source(provider),
530 projection,
531 filters,
532 )?
533 .build()
534 }
535 LogicalPlanType::Sort(sort) => {
536 let input: LogicalPlan =
537 into_logical_plan!(sort.input, ctx, extension_codec)?;
538 let sort_expr: Vec<SortExpr> =
539 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
540 let fetch: Option<usize> = sort.fetch.try_into().ok();
541 LogicalPlanBuilder::from(input)
542 .sort_with_limit(sort_expr, fetch)?
543 .build()
544 }
545 LogicalPlanType::Repartition(repartition) => {
546 use datafusion_expr::Partitioning;
547 let input: LogicalPlan =
548 into_logical_plan!(repartition.input, ctx, extension_codec)?;
549 use protobuf::repartition_node::PartitionMethod;
550 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
551 internal_datafusion_err!(
552 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
553 )
554 })?;
555
556 let partitioning_scheme = match pb_partition_method {
557 PartitionMethod::Hash(protobuf::HashRepartition {
558 hash_expr: pb_hash_expr,
559 partition_count,
560 }) => Partitioning::Hash(
561 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
562 *partition_count as usize,
563 ),
564 PartitionMethod::RoundRobin(partition_count) => {
565 Partitioning::RoundRobinBatch(*partition_count as usize)
566 }
567 };
568
569 LogicalPlanBuilder::from(input)
570 .repartition(partitioning_scheme)?
571 .build()
572 }
573 LogicalPlanType::EmptyRelation(empty_relation) => {
574 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
575 }
576 LogicalPlanType::CreateExternalTable(create_extern_table) => {
577 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
578 internal_datafusion_err!(
579 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
580 )
581 })?;
582
583 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
584 internal_datafusion_err!(
585 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
586 )
587 })?;
588 let definition = if !create_extern_table.definition.is_empty() {
589 Some(create_extern_table.definition.clone())
590 } else {
591 None
592 };
593
594 let mut order_exprs = vec![];
595 for expr in &create_extern_table.order_exprs {
596 order_exprs.push(from_proto::parse_sorts(
597 &expr.sort_expr_nodes,
598 ctx,
599 extension_codec,
600 )?);
601 }
602
603 let mut column_defaults =
604 HashMap::with_capacity(create_extern_table.column_defaults.len());
605 for (col_name, expr) in &create_extern_table.column_defaults {
606 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
607 column_defaults.insert(col_name.clone(), expr);
608 }
609
610 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
611 CreateExternalTable::builder(
612 from_table_reference(
613 create_extern_table.name.as_ref(),
614 "CreateExternalTable",
615 )?,
616 create_extern_table.location.clone(),
617 create_extern_table.file_type.clone(),
618 pb_schema.try_into()?,
619 )
620 .with_partition_cols(create_extern_table.table_partition_cols.clone())
621 .with_order_exprs(order_exprs)
622 .with_if_not_exists(create_extern_table.if_not_exists)
623 .with_or_replace(create_extern_table.or_replace)
624 .with_temporary(create_extern_table.temporary)
625 .with_definition(definition)
626 .with_unbounded(create_extern_table.unbounded)
627 .with_options(create_extern_table.options.clone())
628 .with_constraints(constraints.into())
629 .with_column_defaults(column_defaults)
630 .build(),
631 )))
632 }
633 LogicalPlanType::CreateView(create_view) => {
634 let plan = create_view
635 .input.clone().ok_or_else(|| internal_datafusion_err!(
636 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
637 ))?
638 .try_into_logical_plan(ctx, extension_codec)?;
639 let definition = if !create_view.definition.is_empty() {
640 Some(create_view.definition.clone())
641 } else {
642 None
643 };
644
645 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
646 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
647 temporary: create_view.temporary,
648 input: Arc::new(plan),
649 or_replace: create_view.or_replace,
650 definition,
651 })))
652 }
653 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
654 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
655 internal_datafusion_err!(
656 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
657 )
658 })?;
659
660 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
661 CreateCatalogSchema {
662 schema_name: create_catalog_schema.schema_name.clone(),
663 if_not_exists: create_catalog_schema.if_not_exists,
664 schema: pb_schema.try_into()?,
665 },
666 )))
667 }
668 LogicalPlanType::CreateCatalog(create_catalog) => {
669 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
670 internal_datafusion_err!(
671 "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
672 )
673 })?;
674
675 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
676 CreateCatalog {
677 catalog_name: create_catalog.catalog_name.clone(),
678 if_not_exists: create_catalog.if_not_exists,
679 schema: pb_schema.try_into()?,
680 },
681 )))
682 }
683 LogicalPlanType::Analyze(analyze) => {
684 let input: LogicalPlan =
685 into_logical_plan!(analyze.input, ctx, extension_codec)?;
686 LogicalPlanBuilder::from(input)
687 .explain(analyze.verbose, true)?
688 .build()
689 }
690 LogicalPlanType::Explain(explain) => {
691 let input: LogicalPlan =
692 into_logical_plan!(explain.input, ctx, extension_codec)?;
693 LogicalPlanBuilder::from(input)
694 .explain(explain.verbose, false)?
695 .build()
696 }
697 LogicalPlanType::SubqueryAlias(aliased_relation) => {
698 let input: LogicalPlan =
699 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
700 let alias = from_table_reference(
701 aliased_relation.alias.as_ref(),
702 "SubqueryAlias",
703 )?;
704 LogicalPlanBuilder::from(input).alias(alias)?.build()
705 }
706 LogicalPlanType::Limit(limit) => {
707 let input: LogicalPlan =
708 into_logical_plan!(limit.input, ctx, extension_codec)?;
709 let skip = limit.skip.max(0) as usize;
710
711 let fetch = if limit.fetch < 0 {
712 None
713 } else {
714 Some(limit.fetch as usize)
715 };
716
717 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
718 }
719 LogicalPlanType::Join(join) => {
720 let left_keys: Vec<Expr> =
721 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
722 let right_keys: Vec<Expr> =
723 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
724 let join_type =
725 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
726 proto_error(format!(
727 "Received a JoinNode message with unknown JoinType {}",
728 join.join_type
729 ))
730 })?;
731 let join_constraint = protobuf::JoinConstraint::try_from(
732 join.join_constraint,
733 )
734 .map_err(|_| {
735 proto_error(format!(
736 "Received a JoinNode message with unknown JoinConstraint {}",
737 join.join_constraint
738 ))
739 })?;
740 let filter: Option<Expr> = join
741 .filter
742 .as_ref()
743 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
744 .map_or(Ok(None), |v| v.map(Some))?;
745
746 let builder = LogicalPlanBuilder::from(into_logical_plan!(
747 join.left,
748 ctx,
749 extension_codec
750 )?);
751 let builder = match join_constraint.into() {
752 JoinConstraint::On => builder.join_with_expr_keys(
753 into_logical_plan!(join.right, ctx, extension_codec)?,
754 join_type.into(),
755 (left_keys, right_keys),
756 filter,
757 )?,
758 JoinConstraint::Using => {
759 let using_keys = left_keys
761 .into_iter()
762 .map(|key| {
763 key.try_as_col().cloned()
764 .ok_or_else(|| internal_datafusion_err!(
765 "Using join keys must be column references, got: {key:?}"
766 ))
767 })
768 .collect::<Result<Vec<_>, _>>()?;
769 builder.join_using(
770 into_logical_plan!(join.right, ctx, extension_codec)?,
771 join_type.into(),
772 using_keys,
773 )?
774 }
775 };
776
777 builder.build()
778 }
779 LogicalPlanType::Union(union) => {
780 assert_or_internal_err!(
781 union.inputs.len() >= 2,
782 "Protobuf deserialization error, Union requires at least two inputs."
783 );
784 let (first, rest) = union.inputs.split_first().unwrap();
785 let mut builder = LogicalPlanBuilder::from(
786 first.try_into_logical_plan(ctx, extension_codec)?,
787 );
788
789 for i in rest {
790 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
791 builder = builder.union(plan)?;
792 }
793 builder.build()
794 }
795 LogicalPlanType::CrossJoin(crossjoin) => {
796 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
797 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
798
799 LogicalPlanBuilder::from(left).cross_join(right)?.build()
800 }
801 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
802 let input_plans: Vec<LogicalPlan> = inputs
803 .iter()
804 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
805 .collect::<Result<_>>()?;
806
807 let extension_node =
808 extension_codec.try_decode(node, &input_plans, ctx)?;
809 Ok(LogicalPlan::Extension(extension_node))
810 }
811 LogicalPlanType::Distinct(distinct) => {
812 let input: LogicalPlan =
813 into_logical_plan!(distinct.input, ctx, extension_codec)?;
814 LogicalPlanBuilder::from(input).distinct()?.build()
815 }
816 LogicalPlanType::DistinctOn(distinct_on) => {
817 let input: LogicalPlan =
818 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
819 let on_expr =
820 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
821 let select_expr = from_proto::parse_exprs(
822 &distinct_on.select_expr,
823 ctx,
824 extension_codec,
825 )?;
826 let sort_expr = match distinct_on.sort_expr.len() {
827 0 => None,
828 _ => Some(from_proto::parse_sorts(
829 &distinct_on.sort_expr,
830 ctx,
831 extension_codec,
832 )?),
833 };
834 LogicalPlanBuilder::from(input)
835 .distinct_on(on_expr, select_expr, sort_expr)?
836 .build()
837 }
838 LogicalPlanType::ViewScan(scan) => {
839 let schema: Schema = convert_required!(scan.schema)?;
840
841 let mut projection = None;
842 if let Some(columns) = &scan.projection {
843 let column_indices = columns
844 .columns
845 .iter()
846 .map(|name| schema.index_of(name))
847 .collect::<Result<Vec<usize>, _>>()?;
848 projection = Some(column_indices);
849 }
850
851 let input: LogicalPlan =
852 into_logical_plan!(scan.input, ctx, extension_codec)?;
853
854 let definition = if !scan.definition.is_empty() {
855 Some(scan.definition.clone())
856 } else {
857 None
858 };
859
860 let provider = ViewTable::new(input, definition);
861
862 let table_name =
863 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
864
865 LogicalPlanBuilder::scan(
866 table_name,
867 provider_as_source(Arc::new(provider)),
868 projection,
869 )?
870 .build()
871 }
872 LogicalPlanType::Prepare(prepare) => {
873 let input: LogicalPlan =
874 into_logical_plan!(prepare.input, ctx, extension_codec)?;
875 let data_types: Vec<DataType> = prepare
876 .data_types
877 .iter()
878 .map(DataType::try_from)
879 .collect::<Result<_, _>>()?;
880 let fields: Vec<Field> = prepare
881 .fields
882 .iter()
883 .map(Field::try_from)
884 .collect::<Result<_, _>>()?;
885
886 if fields.is_empty() {
890 LogicalPlanBuilder::from(input)
891 .prepare(
892 prepare.name.clone(),
893 data_types
894 .into_iter()
895 .map(|dt| Field::new("", dt, true).into())
896 .collect(),
897 )?
898 .build()
899 } else {
900 LogicalPlanBuilder::from(input)
901 .prepare(
902 prepare.name.clone(),
903 fields.into_iter().map(|f| f.into()).collect(),
904 )?
905 .build()
906 }
907 }
908 LogicalPlanType::DropView(dropview) => {
909 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
910 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
911 if_exists: dropview.if_exists,
912 schema: Arc::new(convert_required!(dropview.schema)?),
913 })))
914 }
915 LogicalPlanType::CopyTo(copy) => {
916 let input: LogicalPlan =
917 into_logical_plan!(copy.input, ctx, extension_codec)?;
918
919 let file_type: Arc<dyn FileType> = format_as_file_type(
920 extension_codec.try_decode_file_format(©.file_type, ctx)?,
921 );
922
923 Ok(LogicalPlan::Copy(dml::CopyTo::new(
924 Arc::new(input),
925 copy.output_url.clone(),
926 copy.partition_by.clone(),
927 file_type,
928 Default::default(),
929 )))
930 }
931 LogicalPlanType::Unnest(unnest) => {
932 let input: LogicalPlan =
933 into_logical_plan!(unnest.input, ctx, extension_codec)?;
934
935 LogicalPlanBuilder::from(input)
936 .unnest_columns_with_options(
937 unnest.exec_columns.iter().map(|c| c.into()).collect(),
938 into_required!(unnest.options)?,
939 )?
940 .build()
941 }
942 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
943 let static_term = recursive_query_node
944 .static_term
945 .as_ref()
946 .ok_or_else(|| internal_datafusion_err!(
947 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
948 ))?
949 .try_into_logical_plan(ctx, extension_codec)?;
950
951 let recursive_term = recursive_query_node
952 .recursive_term
953 .as_ref()
954 .ok_or_else(|| internal_datafusion_err!(
955 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
956 ))?
957 .try_into_logical_plan(ctx, extension_codec)?;
958
959 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
960 name: recursive_query_node.name.clone(),
961 static_term: Arc::new(static_term),
962 recursive_term: Arc::new(recursive_term),
963 is_distinct: recursive_query_node.is_distinct,
964 }))
965 }
966 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
967 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
968 let schema = convert_required!(*schema)?;
969 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
970 LogicalPlanBuilder::scan(
971 name.as_str(),
972 provider_as_source(Arc::new(cte_work_table)),
973 None,
974 )?
975 .build()
976 }
977 LogicalPlanType::Dml(dml_node) => {
978 Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
979 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
980 to_table_source(&dml_node.target, ctx, extension_codec)?,
981 dml_node.dml_type().into(),
982 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
983 )))
984 }
985 }
986 }
987
988 fn try_from_logical_plan(
989 plan: &LogicalPlan,
990 extension_codec: &dyn LogicalExtensionCodec,
991 ) -> Result<Self>
992 where
993 Self: Sized,
994 {
995 match plan {
996 LogicalPlan::Values(Values { values, .. }) => {
997 let n_cols = if values.is_empty() {
998 0
999 } else {
1000 values[0].len()
1001 } as u64;
1002 let values_list =
1003 serialize_exprs(values.iter().flatten(), extension_codec)?;
1004 Ok(LogicalPlanNode {
1005 logical_plan_type: Some(LogicalPlanType::Values(
1006 protobuf::ValuesNode {
1007 n_cols,
1008 values_list,
1009 },
1010 )),
1011 })
1012 }
1013 LogicalPlan::TableScan(TableScan {
1014 table_name,
1015 source,
1016 filters,
1017 projection,
1018 ..
1019 }) => {
1020 let provider = source_as_provider(source)?;
1021 let schema = provider.schema();
1022 let source = provider.as_any();
1023
1024 let projection = match projection {
1025 None => None,
1026 Some(columns) => {
1027 let column_names = columns
1028 .iter()
1029 .map(|i| schema.field(*i).name().to_owned())
1030 .collect();
1031 Some(protobuf::ProjectionColumns {
1032 columns: column_names,
1033 })
1034 }
1035 };
1036
1037 let filters: Vec<protobuf::LogicalExprNode> =
1038 serialize_exprs(filters, extension_codec)?;
1039
1040 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1041 let any = listing_table.options().format.as_any();
1042 let file_format_type = {
1043 let mut maybe_some_type = None;
1044
1045 #[cfg(feature = "parquet")]
1046 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1047 let options = parquet.options();
1048 maybe_some_type =
1049 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1050 options: Some(options.try_into()?),
1051 }));
1052 };
1053
1054 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1055 let options = csv.options();
1056 maybe_some_type =
1057 Some(FileFormatType::Csv(protobuf::CsvFormat {
1058 options: Some(options.try_into()?),
1059 }));
1060 }
1061
1062 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1063 let options = json.options();
1064 maybe_some_type =
1065 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1066 options: Some(options.try_into()?),
1067 }))
1068 }
1069
1070 #[cfg(feature = "avro")]
1071 if any.is::<AvroFormat>() {
1072 maybe_some_type =
1073 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1074 }
1075
1076 if any.is::<ArrowFormat>() {
1077 maybe_some_type =
1078 Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1079 }
1080
1081 if let Some(file_format_type) = maybe_some_type {
1082 file_format_type
1083 } else {
1084 return Err(proto_error(format!(
1085 "Error deserializing unknown file format: {:?}",
1086 listing_table.options().format
1087 )));
1088 }
1089 };
1090
1091 let options = listing_table.options();
1092
1093 let mut builder = SchemaBuilder::from(schema.as_ref());
1094 for (idx, field) in schema.fields().iter().enumerate().rev() {
1095 if options
1096 .table_partition_cols
1097 .iter()
1098 .any(|(name, _)| name == field.name())
1099 {
1100 builder.remove(idx);
1101 }
1102 }
1103
1104 let schema = builder.finish();
1105
1106 let schema: protobuf::Schema = (&schema).try_into()?;
1107
1108 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1109 for order in &options.file_sort_order {
1110 let expr_vec = SortExprNodeCollection {
1111 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1112 };
1113 exprs_vec.push(expr_vec);
1114 }
1115
1116 let partition_columns = options
1117 .table_partition_cols
1118 .iter()
1119 .map(|(name, arrow_type)| {
1120 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1121 .map_err(|e| {
1122 proto_error(format!(
1123 "Received an unknown ArrowType: {e}"
1124 ))
1125 })?;
1126 Ok(protobuf::PartitionColumn {
1127 name: name.clone(),
1128 arrow_type: Some(arrow_type),
1129 })
1130 })
1131 .collect::<Result<Vec<_>>>()?;
1132
1133 Ok(LogicalPlanNode {
1134 logical_plan_type: Some(LogicalPlanType::ListingScan(
1135 protobuf::ListingTableScanNode {
1136 file_format_type: Some(file_format_type),
1137 table_name: Some(table_name.clone().into()),
1138 collect_stat: options.collect_stat,
1139 file_extension: options.file_extension.clone(),
1140 table_partition_cols: partition_columns,
1141 paths: listing_table
1142 .table_paths()
1143 .iter()
1144 .map(|x| x.to_string())
1145 .collect(),
1146 schema: Some(schema),
1147 projection,
1148 filters,
1149 target_partitions: options.target_partitions as u32,
1150 file_sort_order: exprs_vec,
1151 },
1152 )),
1153 })
1154 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1155 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1156 Ok(LogicalPlanNode {
1157 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1158 protobuf::ViewTableScanNode {
1159 table_name: Some(table_name.clone().into()),
1160 input: Some(Box::new(
1161 LogicalPlanNode::try_from_logical_plan(
1162 view_table.logical_plan(),
1163 extension_codec,
1164 )?,
1165 )),
1166 schema: Some(schema),
1167 projection,
1168 definition: view_table
1169 .definition()
1170 .map(|s| s.to_string())
1171 .unwrap_or_default(),
1172 },
1173 ))),
1174 })
1175 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1176 {
1177 let name = cte_work_table.name().to_string();
1178 let schema = cte_work_table.schema();
1179 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1180
1181 Ok(LogicalPlanNode {
1182 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1183 protobuf::CteWorkTableScanNode {
1184 name,
1185 schema: Some(schema),
1186 },
1187 )),
1188 })
1189 } else {
1190 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1191 let mut bytes = vec![];
1192 extension_codec
1193 .try_encode_table_provider(table_name, provider, &mut bytes)
1194 .map_err(|e| context!("Error serializing custom table", e))?;
1195 let scan = CustomScan(CustomTableScanNode {
1196 table_name: Some(table_name.clone().into()),
1197 projection,
1198 schema: Some(schema),
1199 filters,
1200 custom_table_data: bytes,
1201 });
1202 let node = LogicalPlanNode {
1203 logical_plan_type: Some(scan),
1204 };
1205 Ok(node)
1206 }
1207 }
1208 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1209 Ok(LogicalPlanNode {
1210 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1211 protobuf::ProjectionNode {
1212 input: Some(Box::new(
1213 LogicalPlanNode::try_from_logical_plan(
1214 input.as_ref(),
1215 extension_codec,
1216 )?,
1217 )),
1218 expr: serialize_exprs(expr, extension_codec)?,
1219 optional_alias: None,
1220 },
1221 ))),
1222 })
1223 }
1224 LogicalPlan::Filter(filter) => {
1225 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1226 filter.input.as_ref(),
1227 extension_codec,
1228 )?;
1229 Ok(LogicalPlanNode {
1230 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1231 protobuf::SelectionNode {
1232 input: Some(Box::new(input)),
1233 expr: Some(serialize_expr(
1234 &filter.predicate,
1235 extension_codec,
1236 )?),
1237 },
1238 ))),
1239 })
1240 }
1241 LogicalPlan::Distinct(Distinct::All(input)) => {
1242 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1243 input.as_ref(),
1244 extension_codec,
1245 )?;
1246 Ok(LogicalPlanNode {
1247 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1248 protobuf::DistinctNode {
1249 input: Some(Box::new(input)),
1250 },
1251 ))),
1252 })
1253 }
1254 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1255 on_expr,
1256 select_expr,
1257 sort_expr,
1258 input,
1259 ..
1260 })) => {
1261 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1262 input.as_ref(),
1263 extension_codec,
1264 )?;
1265 let sort_expr = match sort_expr {
1266 None => vec![],
1267 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1268 };
1269 Ok(LogicalPlanNode {
1270 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1271 protobuf::DistinctOnNode {
1272 on_expr: serialize_exprs(on_expr, extension_codec)?,
1273 select_expr: serialize_exprs(select_expr, extension_codec)?,
1274 sort_expr,
1275 input: Some(Box::new(input)),
1276 },
1277 ))),
1278 })
1279 }
1280 LogicalPlan::Window(Window {
1281 input, window_expr, ..
1282 }) => {
1283 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1284 input.as_ref(),
1285 extension_codec,
1286 )?;
1287 Ok(LogicalPlanNode {
1288 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1289 protobuf::WindowNode {
1290 input: Some(Box::new(input)),
1291 window_expr: serialize_exprs(window_expr, extension_codec)?,
1292 },
1293 ))),
1294 })
1295 }
1296 LogicalPlan::Aggregate(Aggregate {
1297 group_expr,
1298 aggr_expr,
1299 input,
1300 ..
1301 }) => {
1302 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1303 input.as_ref(),
1304 extension_codec,
1305 )?;
1306 Ok(LogicalPlanNode {
1307 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1308 protobuf::AggregateNode {
1309 input: Some(Box::new(input)),
1310 group_expr: serialize_exprs(group_expr, extension_codec)?,
1311 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1312 },
1313 ))),
1314 })
1315 }
1316 LogicalPlan::Join(Join {
1317 left,
1318 right,
1319 on,
1320 filter,
1321 join_type,
1322 join_constraint,
1323 null_equality,
1324 ..
1325 }) => {
1326 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1327 left.as_ref(),
1328 extension_codec,
1329 )?;
1330 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1331 right.as_ref(),
1332 extension_codec,
1333 )?;
1334 let (left_join_key, right_join_key) = on
1335 .iter()
1336 .map(|(l, r)| {
1337 Ok((
1338 serialize_expr(l, extension_codec)?,
1339 serialize_expr(r, extension_codec)?,
1340 ))
1341 })
1342 .collect::<Result<Vec<_>, ToProtoError>>()?
1343 .into_iter()
1344 .unzip();
1345 let join_type: protobuf::JoinType = join_type.to_owned().into();
1346 let join_constraint: protobuf::JoinConstraint =
1347 join_constraint.to_owned().into();
1348 let null_equality: protobuf::NullEquality =
1349 null_equality.to_owned().into();
1350 let filter = filter
1351 .as_ref()
1352 .map(|e| serialize_expr(e, extension_codec))
1353 .map_or(Ok(None), |v| v.map(Some))?;
1354 Ok(LogicalPlanNode {
1355 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1356 protobuf::JoinNode {
1357 left: Some(Box::new(left)),
1358 right: Some(Box::new(right)),
1359 join_type: join_type.into(),
1360 join_constraint: join_constraint.into(),
1361 left_join_key,
1362 right_join_key,
1363 null_equality: null_equality.into(),
1364 filter,
1365 },
1366 ))),
1367 })
1368 }
1369 LogicalPlan::Subquery(_) => {
1370 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1371 }
1372 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1373 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1374 input.as_ref(),
1375 extension_codec,
1376 )?;
1377 Ok(LogicalPlanNode {
1378 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1379 protobuf::SubqueryAliasNode {
1380 input: Some(Box::new(input)),
1381 alias: Some((*alias).clone().into()),
1382 },
1383 ))),
1384 })
1385 }
1386 LogicalPlan::Limit(limit) => {
1387 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1388 limit.input.as_ref(),
1389 extension_codec,
1390 )?;
1391 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1392 return Err(proto_error(
1393 "LogicalPlan::Limit only supports literal skip values",
1394 ));
1395 };
1396 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1397 return Err(proto_error(
1398 "LogicalPlan::Limit only supports literal fetch values",
1399 ));
1400 };
1401
1402 Ok(LogicalPlanNode {
1403 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1404 protobuf::LimitNode {
1405 input: Some(Box::new(input)),
1406 skip: skip as i64,
1407 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1408 },
1409 ))),
1410 })
1411 }
1412 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1413 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414 input.as_ref(),
1415 extension_codec,
1416 )?;
1417 let sort_expr: Vec<protobuf::SortExprNode> =
1418 serialize_sorts(expr, extension_codec)?;
1419 Ok(LogicalPlanNode {
1420 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1421 protobuf::SortNode {
1422 input: Some(Box::new(input)),
1423 expr: sort_expr,
1424 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1425 },
1426 ))),
1427 })
1428 }
1429 LogicalPlan::Repartition(Repartition {
1430 input,
1431 partitioning_scheme,
1432 }) => {
1433 use datafusion_expr::Partitioning;
1434 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1435 input.as_ref(),
1436 extension_codec,
1437 )?;
1438
1439 use protobuf::repartition_node::PartitionMethod;
1442
1443 let pb_partition_method = match partitioning_scheme {
1444 Partitioning::Hash(exprs, partition_count) => {
1445 PartitionMethod::Hash(protobuf::HashRepartition {
1446 hash_expr: serialize_exprs(exprs, extension_codec)?,
1447 partition_count: *partition_count as u64,
1448 })
1449 }
1450 Partitioning::RoundRobinBatch(partition_count) => {
1451 PartitionMethod::RoundRobin(*partition_count as u64)
1452 }
1453 Partitioning::DistributeBy(_) => {
1454 return not_impl_err!("DistributeBy");
1455 }
1456 };
1457
1458 Ok(LogicalPlanNode {
1459 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1460 protobuf::RepartitionNode {
1461 input: Some(Box::new(input)),
1462 partition_method: Some(pb_partition_method),
1463 },
1464 ))),
1465 })
1466 }
1467 LogicalPlan::EmptyRelation(EmptyRelation {
1468 produce_one_row, ..
1469 }) => Ok(LogicalPlanNode {
1470 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1471 protobuf::EmptyRelationNode {
1472 produce_one_row: *produce_one_row,
1473 },
1474 )),
1475 }),
1476 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1477 CreateExternalTable {
1478 name,
1479 location,
1480 file_type,
1481 schema: df_schema,
1482 table_partition_cols,
1483 if_not_exists,
1484 or_replace,
1485 definition,
1486 order_exprs,
1487 unbounded,
1488 options,
1489 constraints,
1490 column_defaults,
1491 temporary,
1492 },
1493 )) => {
1494 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1495 for order in order_exprs {
1496 let temp = SortExprNodeCollection {
1497 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1498 };
1499 converted_order_exprs.push(temp);
1500 }
1501
1502 let mut converted_column_defaults =
1503 HashMap::with_capacity(column_defaults.len());
1504 for (col_name, expr) in column_defaults {
1505 converted_column_defaults
1506 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1507 }
1508
1509 Ok(LogicalPlanNode {
1510 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1511 protobuf::CreateExternalTableNode {
1512 name: Some(name.clone().into()),
1513 location: location.clone(),
1514 file_type: file_type.clone(),
1515 schema: Some(df_schema.try_into()?),
1516 table_partition_cols: table_partition_cols.clone(),
1517 if_not_exists: *if_not_exists,
1518 or_replace: *or_replace,
1519 temporary: *temporary,
1520 order_exprs: converted_order_exprs,
1521 definition: definition.clone().unwrap_or_default(),
1522 unbounded: *unbounded,
1523 options: options.clone(),
1524 constraints: Some(constraints.clone().into()),
1525 column_defaults: converted_column_defaults,
1526 },
1527 )),
1528 })
1529 }
1530 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1531 name,
1532 input,
1533 or_replace,
1534 definition,
1535 temporary,
1536 })) => Ok(LogicalPlanNode {
1537 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1538 protobuf::CreateViewNode {
1539 name: Some(name.clone().into()),
1540 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1541 input,
1542 extension_codec,
1543 )?)),
1544 or_replace: *or_replace,
1545 temporary: *temporary,
1546 definition: definition.clone().unwrap_or_default(),
1547 },
1548 ))),
1549 }),
1550 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1551 CreateCatalogSchema {
1552 schema_name,
1553 if_not_exists,
1554 schema: df_schema,
1555 },
1556 )) => Ok(LogicalPlanNode {
1557 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1558 protobuf::CreateCatalogSchemaNode {
1559 schema_name: schema_name.clone(),
1560 if_not_exists: *if_not_exists,
1561 schema: Some(df_schema.try_into()?),
1562 },
1563 )),
1564 }),
1565 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1566 catalog_name,
1567 if_not_exists,
1568 schema: df_schema,
1569 })) => Ok(LogicalPlanNode {
1570 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1571 protobuf::CreateCatalogNode {
1572 catalog_name: catalog_name.clone(),
1573 if_not_exists: *if_not_exists,
1574 schema: Some(df_schema.try_into()?),
1575 },
1576 )),
1577 }),
1578 LogicalPlan::Analyze(a) => {
1579 let input = LogicalPlanNode::try_from_logical_plan(
1580 a.input.as_ref(),
1581 extension_codec,
1582 )?;
1583 Ok(LogicalPlanNode {
1584 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1585 protobuf::AnalyzeNode {
1586 input: Some(Box::new(input)),
1587 verbose: a.verbose,
1588 },
1589 ))),
1590 })
1591 }
1592 LogicalPlan::Explain(a) => {
1593 let input = LogicalPlanNode::try_from_logical_plan(
1594 a.plan.as_ref(),
1595 extension_codec,
1596 )?;
1597 Ok(LogicalPlanNode {
1598 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1599 protobuf::ExplainNode {
1600 input: Some(Box::new(input)),
1601 verbose: a.verbose,
1602 },
1603 ))),
1604 })
1605 }
1606 LogicalPlan::Union(union) => {
1607 let inputs: Vec<LogicalPlanNode> = union
1608 .inputs
1609 .iter()
1610 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1611 .collect::<Result<_>>()?;
1612 Ok(LogicalPlanNode {
1613 logical_plan_type: Some(LogicalPlanType::Union(
1614 protobuf::UnionNode { inputs },
1615 )),
1616 })
1617 }
1618 LogicalPlan::Extension(extension) => {
1619 let mut buf: Vec<u8> = vec![];
1620 extension_codec.try_encode(extension, &mut buf)?;
1621
1622 let inputs: Vec<LogicalPlanNode> = extension
1623 .node
1624 .inputs()
1625 .iter()
1626 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1627 .collect::<Result<_>>()?;
1628
1629 Ok(LogicalPlanNode {
1630 logical_plan_type: Some(LogicalPlanType::Extension(
1631 LogicalExtensionNode { node: buf, inputs },
1632 )),
1633 })
1634 }
1635 LogicalPlan::Statement(Statement::Prepare(Prepare {
1636 name,
1637 fields,
1638 input,
1639 })) => {
1640 let input =
1641 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1642 Ok(LogicalPlanNode {
1643 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1644 protobuf::PrepareNode {
1645 name: name.clone(),
1646 input: Some(Box::new(input)),
1647 data_types: fields
1649 .iter()
1650 .map(|f| f.data_type().try_into())
1651 .collect::<Result<Vec<_>, _>>()?,
1652 fields: fields
1654 .iter()
1655 .map(|f| f.as_ref().try_into())
1656 .collect::<Result<Vec<_>, _>>()?,
1657 },
1658 ))),
1659 })
1660 }
1661 LogicalPlan::Unnest(Unnest {
1662 input,
1663 exec_columns,
1664 list_type_columns,
1665 struct_type_columns,
1666 dependency_indices,
1667 schema,
1668 options,
1669 }) => {
1670 let input =
1671 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1672 let proto_unnest_list_items = list_type_columns
1673 .iter()
1674 .map(|(index, ul)| ColumnUnnestListItem {
1675 input_index: *index as _,
1676 recursion: Some(ColumnUnnestListRecursion {
1677 output_column: Some(ul.output_column.to_owned().into()),
1678 depth: ul.depth as _,
1679 }),
1680 })
1681 .collect();
1682 Ok(LogicalPlanNode {
1683 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1684 protobuf::UnnestNode {
1685 input: Some(Box::new(input)),
1686 exec_columns: exec_columns
1687 .iter()
1688 .map(|col| col.into())
1689 .collect(),
1690 list_type_columns: proto_unnest_list_items,
1691 struct_type_columns: struct_type_columns
1692 .iter()
1693 .map(|c| *c as u64)
1694 .collect(),
1695 dependency_indices: dependency_indices
1696 .iter()
1697 .map(|c| *c as u64)
1698 .collect(),
1699 schema: Some(schema.try_into()?),
1700 options: Some(options.into()),
1701 },
1702 ))),
1703 })
1704 }
1705 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1706 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1707 )),
1708 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1709 "LogicalPlan serde is not yet implemented for CreateIndex",
1710 )),
1711 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1712 "LogicalPlan serde is not yet implemented for DropTable",
1713 )),
1714 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1715 name,
1716 if_exists,
1717 schema,
1718 })) => Ok(LogicalPlanNode {
1719 logical_plan_type: Some(LogicalPlanType::DropView(
1720 protobuf::DropViewNode {
1721 name: Some(name.clone().into()),
1722 if_exists: *if_exists,
1723 schema: Some(schema.try_into()?),
1724 },
1725 )),
1726 }),
1727 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1728 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1729 )),
1730 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1731 "LogicalPlan serde is not yet implemented for CreateFunction",
1732 )),
1733 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1734 "LogicalPlan serde is not yet implemented for DropFunction",
1735 )),
1736 LogicalPlan::Statement(_) => Err(proto_error(
1737 "LogicalPlan serde is not yet implemented for Statement",
1738 )),
1739 LogicalPlan::Dml(DmlStatement {
1740 table_name,
1741 target,
1742 op,
1743 input,
1744 ..
1745 }) => {
1746 let input =
1747 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1748 let dml_type: dml_node::Type = op.into();
1749 Ok(LogicalPlanNode {
1750 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1751 input: Some(Box::new(input)),
1752 target: Some(Box::new(from_table_source(
1753 table_name.clone(),
1754 Arc::clone(target),
1755 extension_codec,
1756 )?)),
1757 table_name: Some(table_name.clone().into()),
1758 dml_type: dml_type.into(),
1759 }))),
1760 })
1761 }
1762 LogicalPlan::Copy(dml::CopyTo {
1763 input,
1764 output_url,
1765 file_type,
1766 partition_by,
1767 ..
1768 }) => {
1769 let input =
1770 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1771 let mut buf = Vec::new();
1772 extension_codec
1773 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1774
1775 Ok(LogicalPlanNode {
1776 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1777 protobuf::CopyToNode {
1778 input: Some(Box::new(input)),
1779 output_url: output_url.to_string(),
1780 file_type: buf,
1781 partition_by: partition_by.clone(),
1782 },
1783 ))),
1784 })
1785 }
1786 LogicalPlan::DescribeTable(_) => Err(proto_error(
1787 "LogicalPlan serde is not yet implemented for DescribeTable",
1788 )),
1789 LogicalPlan::RecursiveQuery(recursive) => {
1790 let static_term = LogicalPlanNode::try_from_logical_plan(
1791 recursive.static_term.as_ref(),
1792 extension_codec,
1793 )?;
1794 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1795 recursive.recursive_term.as_ref(),
1796 extension_codec,
1797 )?;
1798
1799 Ok(LogicalPlanNode {
1800 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1801 protobuf::RecursiveQueryNode {
1802 name: recursive.name.clone(),
1803 static_term: Some(Box::new(static_term)),
1804 recursive_term: Some(Box::new(recursive_term)),
1805 is_distinct: recursive.is_distinct,
1806 },
1807 ))),
1808 })
1809 }
1810 }
1811 }
1812}