1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 dml_node, ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, listing_table_scan_node::FileFormatType,
31 logical_plan_node::LogicalPlanType, LogicalExtensionNode, LogicalPlanNode,
32 },
33};
34
35use crate::protobuf::{proto_error, ToProtoError};
36use arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef};
37use datafusion::datasource::cte_worktable::CteWorkTable;
38#[cfg(feature = "avro")]
39use datafusion::datasource::file_format::avro::AvroFormat;
40#[cfg(feature = "parquet")]
41use datafusion::datasource::file_format::parquet::ParquetFormat;
42use datafusion::datasource::file_format::{
43 file_type_to_format, format_as_file_type, FileFormatFactory,
44};
45use datafusion::{
46 datasource::{
47 file_format::{
48 csv::CsvFormat, json::JsonFormat as OtherNdJsonFormat, FileFormat,
49 },
50 listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
51 view::ViewTable,
52 TableProvider,
53 },
54 datasource::{provider_as_source, source_as_provider},
55 prelude::SessionContext,
56};
57use datafusion_common::file_options::file_type::FileType;
58use datafusion_common::{
59 context, internal_datafusion_err, internal_err, not_impl_err, plan_err,
60 DataFusionError, Result, TableReference, ToDFSchema,
61};
62use datafusion_expr::{
63 dml,
64 logical_plan::{
65 builder::project, Aggregate, CreateCatalog, CreateCatalogSchema,
66 CreateExternalTable, CreateView, DdlStatement, Distinct, EmptyRelation,
67 Extension, Join, JoinConstraint, Prepare, Projection, Repartition, Sort,
68 SubqueryAlias, TableScan, Values, Window,
69 },
70 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
71 Statement, WindowUDF,
72};
73use datafusion_expr::{
74 AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
75};
76
77use self::to_proto::{serialize_expr, serialize_exprs};
78use crate::logical_plan::to_proto::serialize_sorts;
79use prost::bytes::BufMut;
80use prost::Message;
81
82pub mod file_formats;
83pub mod from_proto;
84pub mod to_proto;
85
86pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
87 fn try_decode(buf: &[u8]) -> Result<Self>
88 where
89 Self: Sized;
90
91 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
92 where
93 B: BufMut,
94 Self: Sized;
95
96 fn try_into_logical_plan(
97 &self,
98 ctx: &SessionContext,
99 extension_codec: &dyn LogicalExtensionCodec,
100 ) -> Result<LogicalPlan>;
101
102 fn try_from_logical_plan(
103 plan: &LogicalPlan,
104 extension_codec: &dyn LogicalExtensionCodec,
105 ) -> Result<Self>
106 where
107 Self: Sized;
108}
109
110pub trait LogicalExtensionCodec: Debug + Send + Sync {
111 fn try_decode(
112 &self,
113 buf: &[u8],
114 inputs: &[LogicalPlan],
115 ctx: &SessionContext,
116 ) -> Result<Extension>;
117
118 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
119
120 fn try_decode_table_provider(
121 &self,
122 buf: &[u8],
123 table_ref: &TableReference,
124 schema: SchemaRef,
125 ctx: &SessionContext,
126 ) -> Result<Arc<dyn TableProvider>>;
127
128 fn try_encode_table_provider(
129 &self,
130 table_ref: &TableReference,
131 node: Arc<dyn TableProvider>,
132 buf: &mut Vec<u8>,
133 ) -> Result<()>;
134
135 fn try_decode_file_format(
136 &self,
137 _buf: &[u8],
138 _ctx: &SessionContext,
139 ) -> Result<Arc<dyn FileFormatFactory>> {
140 not_impl_err!("LogicalExtensionCodec is not provided for file format")
141 }
142
143 fn try_encode_file_format(
144 &self,
145 _buf: &mut Vec<u8>,
146 _node: Arc<dyn FileFormatFactory>,
147 ) -> Result<()> {
148 Ok(())
149 }
150
151 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
152 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
153 }
154
155 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
156 Ok(())
157 }
158
159 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
160 not_impl_err!(
161 "LogicalExtensionCodec is not provided for aggregate function {name}"
162 )
163 }
164
165 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
166 Ok(())
167 }
168
169 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
170 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
171 }
172
173 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
174 Ok(())
175 }
176}
177
178#[derive(Debug, Clone)]
179pub struct DefaultLogicalExtensionCodec {}
180
181impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
182 fn try_decode(
183 &self,
184 _buf: &[u8],
185 _inputs: &[LogicalPlan],
186 _ctx: &SessionContext,
187 ) -> Result<Extension> {
188 not_impl_err!("LogicalExtensionCodec is not provided")
189 }
190
191 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
192 not_impl_err!("LogicalExtensionCodec is not provided")
193 }
194
195 fn try_decode_table_provider(
196 &self,
197 _buf: &[u8],
198 _table_ref: &TableReference,
199 _schema: SchemaRef,
200 _ctx: &SessionContext,
201 ) -> Result<Arc<dyn TableProvider>> {
202 not_impl_err!("LogicalExtensionCodec is not provided")
203 }
204
205 fn try_encode_table_provider(
206 &self,
207 _table_ref: &TableReference,
208 _node: Arc<dyn TableProvider>,
209 _buf: &mut Vec<u8>,
210 ) -> Result<()> {
211 not_impl_err!("LogicalExtensionCodec is not provided")
212 }
213}
214
215#[macro_export]
216macro_rules! into_logical_plan {
217 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
218 if let Some(field) = $PB.as_ref() {
219 field.as_ref().try_into_logical_plan($CTX, $CODEC)
220 } else {
221 Err(proto_error("Missing required field in protobuf"))
222 }
223 }};
224}
225
226fn from_table_reference(
227 table_ref: Option<&protobuf::TableReference>,
228 error_context: &str,
229) -> Result<TableReference> {
230 let table_ref = table_ref.ok_or_else(|| {
231 DataFusionError::Internal(format!(
232 "Protobuf deserialization error, {error_context} was missing required field name."
233 ))
234 })?;
235
236 Ok(table_ref.clone().try_into()?)
237}
238
239fn to_table_source(
243 node: &Option<Box<LogicalPlanNode>>,
244 ctx: &SessionContext,
245 extension_codec: &dyn LogicalExtensionCodec,
246) -> Result<Arc<dyn TableSource>> {
247 if let Some(node) = node {
248 match node.try_into_logical_plan(ctx, extension_codec)? {
249 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
250 _ => plan_err!("expected TableScan node"),
251 }
252 } else {
253 plan_err!("LogicalPlanNode should be provided")
254 }
255}
256
257fn from_table_source(
261 table_name: TableReference,
262 target: Arc<dyn TableSource>,
263 extension_codec: &dyn LogicalExtensionCodec,
264) -> Result<LogicalPlanNode> {
265 let projected_schema = target.schema().to_dfschema_ref()?;
266 let r = LogicalPlan::TableScan(TableScan {
267 table_name,
268 source: target,
269 projection: None,
270 projected_schema,
271 filters: vec![],
272 fetch: None,
273 });
274
275 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
276}
277
278impl AsLogicalPlan for LogicalPlanNode {
279 fn try_decode(buf: &[u8]) -> Result<Self>
280 where
281 Self: Sized,
282 {
283 LogicalPlanNode::decode(buf).map_err(|e| {
284 DataFusionError::Internal(format!("failed to decode logical plan: {e:?}"))
285 })
286 }
287
288 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
289 where
290 B: BufMut,
291 Self: Sized,
292 {
293 self.encode(buf).map_err(|e| {
294 DataFusionError::Internal(format!("failed to encode logical plan: {e:?}"))
295 })
296 }
297
298 fn try_into_logical_plan(
299 &self,
300 ctx: &SessionContext,
301 extension_codec: &dyn LogicalExtensionCodec,
302 ) -> Result<LogicalPlan> {
303 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
304 proto_error(format!(
305 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
306 ))
307 })?;
308 match plan {
309 LogicalPlanType::Values(values) => {
310 let n_cols = values.n_cols as usize;
311 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
312 Ok(Vec::new())
313 } else if values.values_list.len() % n_cols != 0 {
314 internal_err!(
315 "Invalid values list length, expect {} to be divisible by {}",
316 values.values_list.len(),
317 n_cols
318 )
319 } else {
320 values
321 .values_list
322 .chunks_exact(n_cols)
323 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
324 .collect::<Result<Vec<_>, _>>()
325 .map_err(|e| e.into())
326 }?;
327
328 LogicalPlanBuilder::values(values)?.build()
329 }
330 LogicalPlanType::Projection(projection) => {
331 let input: LogicalPlan =
332 into_logical_plan!(projection.input, ctx, extension_codec)?;
333 let expr: Vec<Expr> =
334 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
335
336 let new_proj = project(input, expr)?;
337 match projection.optional_alias.as_ref() {
338 Some(a) => match a {
339 protobuf::projection_node::OptionalAlias::Alias(alias) => {
340 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
341 Arc::new(new_proj),
342 alias.clone(),
343 )?))
344 }
345 },
346 _ => Ok(new_proj),
347 }
348 }
349 LogicalPlanType::Selection(selection) => {
350 let input: LogicalPlan =
351 into_logical_plan!(selection.input, ctx, extension_codec)?;
352 let expr: Expr = selection
353 .expr
354 .as_ref()
355 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
356 .transpose()?
357 .ok_or_else(|| proto_error("expression required"))?;
358 LogicalPlanBuilder::from(input).filter(expr)?.build()
359 }
360 LogicalPlanType::Window(window) => {
361 let input: LogicalPlan =
362 into_logical_plan!(window.input, ctx, extension_codec)?;
363 let window_expr =
364 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
365 LogicalPlanBuilder::from(input).window(window_expr)?.build()
366 }
367 LogicalPlanType::Aggregate(aggregate) => {
368 let input: LogicalPlan =
369 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
370 let group_expr =
371 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
372 let aggr_expr =
373 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
374 LogicalPlanBuilder::from(input)
375 .aggregate(group_expr, aggr_expr)?
376 .build()
377 }
378 LogicalPlanType::ListingScan(scan) => {
379 let schema: Schema = convert_required!(scan.schema)?;
380
381 let mut projection = None;
382 if let Some(columns) = &scan.projection {
383 let column_indices = columns
384 .columns
385 .iter()
386 .map(|name| schema.index_of(name))
387 .collect::<Result<Vec<usize>, _>>()?;
388 projection = Some(column_indices);
389 }
390
391 let filters =
392 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
393
394 let mut all_sort_orders = vec![];
395 for order in &scan.file_sort_order {
396 all_sort_orders.push(from_proto::parse_sorts(
397 &order.sort_expr_nodes,
398 ctx,
399 extension_codec,
400 )?)
401 }
402
403 let file_format: Arc<dyn FileFormat> =
404 match scan.file_format_type.as_ref().ok_or_else(|| {
405 proto_error(format!(
406 "logical_plan::from_proto() Unsupported file format '{self:?}'"
407 ))
408 })? {
409 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
410 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
411 #[cfg(feature = "parquet")]
412 {
413 let mut parquet = ParquetFormat::default();
414 if let Some(options) = options {
415 parquet = parquet.with_options(options.try_into()?)
416 }
417 Arc::new(parquet)
418 }
419 #[cfg(not(feature = "parquet"))]
420 panic!("Unable to process parquet file since `parquet` feature is not enabled");
421 }
422 FileFormatType::Csv(protobuf::CsvFormat {
423 options
424 }) => {
425 let mut csv = CsvFormat::default();
426 if let Some(options) = options {
427 csv = csv.with_options(options.try_into()?)
428 }
429 Arc::new(csv)
430 },
431 FileFormatType::Json(protobuf::NdJsonFormat {
432 options
433 }) => {
434 let mut json = OtherNdJsonFormat::default();
435 if let Some(options) = options {
436 json = json.with_options(options.try_into()?)
437 }
438 Arc::new(json)
439 }
440 #[cfg_attr(not(feature = "avro"), allow(unused_variables))]
441 FileFormatType::Avro(..) => {
442 #[cfg(feature = "avro")]
443 {
444 Arc::new(AvroFormat)
445 }
446 #[cfg(not(feature = "avro"))]
447 panic!("Unable to process avro file since `avro` feature is not enabled");
448 }
449 };
450
451 let table_paths = &scan
452 .paths
453 .iter()
454 .map(ListingTableUrl::parse)
455 .collect::<Result<Vec<_>, _>>()?;
456
457 let partition_columns = scan
458 .table_partition_cols
459 .iter()
460 .map(|col| {
461 let Some(arrow_type) = col.arrow_type.as_ref() else {
462 return Err(proto_error(
463 "Missing Arrow type in partition columns",
464 ));
465 };
466 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
467 proto_error(format!("Received an unknown ArrowType: {e}"))
468 })?;
469 Ok((col.name.clone(), arrow_type))
470 })
471 .collect::<Result<Vec<_>>>()?;
472
473 let options = ListingOptions::new(file_format)
474 .with_file_extension(&scan.file_extension)
475 .with_table_partition_cols(partition_columns)
476 .with_collect_stat(scan.collect_stat)
477 .with_target_partitions(scan.target_partitions as usize)
478 .with_file_sort_order(all_sort_orders);
479
480 let config =
481 ListingTableConfig::new_with_multi_paths(table_paths.clone())
482 .with_listing_options(options)
483 .with_schema(Arc::new(schema));
484
485 let provider = ListingTable::try_new(config)?.with_cache(
486 ctx.state()
487 .runtime_env()
488 .cache_manager
489 .get_file_statistic_cache(),
490 );
491
492 let table_name =
493 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
494
495 LogicalPlanBuilder::scan_with_filters(
496 table_name,
497 provider_as_source(Arc::new(provider)),
498 projection,
499 filters,
500 )?
501 .build()
502 }
503 LogicalPlanType::CustomScan(scan) => {
504 let schema: Schema = convert_required!(scan.schema)?;
505 let schema = Arc::new(schema);
506 let mut projection = None;
507 if let Some(columns) = &scan.projection {
508 let column_indices = columns
509 .columns
510 .iter()
511 .map(|name| schema.index_of(name))
512 .collect::<Result<Vec<usize>, _>>()?;
513 projection = Some(column_indices);
514 }
515
516 let filters =
517 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
518
519 let table_name =
520 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
521
522 let provider = extension_codec.try_decode_table_provider(
523 &scan.custom_table_data,
524 &table_name,
525 schema,
526 ctx,
527 )?;
528
529 LogicalPlanBuilder::scan_with_filters(
530 table_name,
531 provider_as_source(provider),
532 projection,
533 filters,
534 )?
535 .build()
536 }
537 LogicalPlanType::Sort(sort) => {
538 let input: LogicalPlan =
539 into_logical_plan!(sort.input, ctx, extension_codec)?;
540 let sort_expr: Vec<SortExpr> =
541 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
542 let fetch: Option<usize> = sort.fetch.try_into().ok();
543 LogicalPlanBuilder::from(input)
544 .sort_with_limit(sort_expr, fetch)?
545 .build()
546 }
547 LogicalPlanType::Repartition(repartition) => {
548 use datafusion::logical_expr::Partitioning;
549 let input: LogicalPlan =
550 into_logical_plan!(repartition.input, ctx, extension_codec)?;
551 use protobuf::repartition_node::PartitionMethod;
552 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
553 internal_datafusion_err!(
554 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
555 )
556 })?;
557
558 let partitioning_scheme = match pb_partition_method {
559 PartitionMethod::Hash(protobuf::HashRepartition {
560 hash_expr: pb_hash_expr,
561 partition_count,
562 }) => Partitioning::Hash(
563 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
564 *partition_count as usize,
565 ),
566 PartitionMethod::RoundRobin(partition_count) => {
567 Partitioning::RoundRobinBatch(*partition_count as usize)
568 }
569 };
570
571 LogicalPlanBuilder::from(input)
572 .repartition(partitioning_scheme)?
573 .build()
574 }
575 LogicalPlanType::EmptyRelation(empty_relation) => {
576 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
577 }
578 LogicalPlanType::CreateExternalTable(create_extern_table) => {
579 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
580 DataFusionError::Internal(String::from(
581 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
582 ))
583 })?;
584
585 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
586 DataFusionError::Internal(String::from(
587 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints.",
588 ))
589 })?;
590 let definition = if !create_extern_table.definition.is_empty() {
591 Some(create_extern_table.definition.clone())
592 } else {
593 None
594 };
595
596 let file_type = create_extern_table.file_type.as_str();
597 if ctx.table_factory(file_type).is_none() {
598 internal_err!("No TableProviderFactory for file type: {file_type}")?
599 }
600
601 let mut order_exprs = vec![];
602 for expr in &create_extern_table.order_exprs {
603 order_exprs.push(from_proto::parse_sorts(
604 &expr.sort_expr_nodes,
605 ctx,
606 extension_codec,
607 )?);
608 }
609
610 let mut column_defaults =
611 HashMap::with_capacity(create_extern_table.column_defaults.len());
612 for (col_name, expr) in &create_extern_table.column_defaults {
613 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
614 column_defaults.insert(col_name.clone(), expr);
615 }
616
617 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
618 CreateExternalTable {
619 schema: pb_schema.try_into()?,
620 name: from_table_reference(
621 create_extern_table.name.as_ref(),
622 "CreateExternalTable",
623 )?,
624 location: create_extern_table.location.clone(),
625 file_type: create_extern_table.file_type.clone(),
626 table_partition_cols: create_extern_table
627 .table_partition_cols
628 .clone(),
629 order_exprs,
630 if_not_exists: create_extern_table.if_not_exists,
631 temporary: create_extern_table.temporary,
632 definition,
633 unbounded: create_extern_table.unbounded,
634 options: create_extern_table.options.clone(),
635 constraints: constraints.into(),
636 column_defaults,
637 },
638 )))
639 }
640 LogicalPlanType::CreateView(create_view) => {
641 let plan = create_view
642 .input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
643 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
644 )))?
645 .try_into_logical_plan(ctx, extension_codec)?;
646 let definition = if !create_view.definition.is_empty() {
647 Some(create_view.definition.clone())
648 } else {
649 None
650 };
651
652 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
653 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
654 temporary: create_view.temporary,
655 input: Arc::new(plan),
656 or_replace: create_view.or_replace,
657 definition,
658 })))
659 }
660 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
661 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
662 DataFusionError::Internal(String::from(
663 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
664 ))
665 })?;
666
667 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
668 CreateCatalogSchema {
669 schema_name: create_catalog_schema.schema_name.clone(),
670 if_not_exists: create_catalog_schema.if_not_exists,
671 schema: pb_schema.try_into()?,
672 },
673 )))
674 }
675 LogicalPlanType::CreateCatalog(create_catalog) => {
676 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
677 DataFusionError::Internal(String::from(
678 "Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
679 ))
680 })?;
681
682 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
683 CreateCatalog {
684 catalog_name: create_catalog.catalog_name.clone(),
685 if_not_exists: create_catalog.if_not_exists,
686 schema: pb_schema.try_into()?,
687 },
688 )))
689 }
690 LogicalPlanType::Analyze(analyze) => {
691 let input: LogicalPlan =
692 into_logical_plan!(analyze.input, ctx, extension_codec)?;
693 LogicalPlanBuilder::from(input)
694 .explain(analyze.verbose, true)?
695 .build()
696 }
697 LogicalPlanType::Explain(explain) => {
698 let input: LogicalPlan =
699 into_logical_plan!(explain.input, ctx, extension_codec)?;
700 LogicalPlanBuilder::from(input)
701 .explain(explain.verbose, false)?
702 .build()
703 }
704 LogicalPlanType::SubqueryAlias(aliased_relation) => {
705 let input: LogicalPlan =
706 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
707 let alias = from_table_reference(
708 aliased_relation.alias.as_ref(),
709 "SubqueryAlias",
710 )?;
711 LogicalPlanBuilder::from(input).alias(alias)?.build()
712 }
713 LogicalPlanType::Limit(limit) => {
714 let input: LogicalPlan =
715 into_logical_plan!(limit.input, ctx, extension_codec)?;
716 let skip = limit.skip.max(0) as usize;
717
718 let fetch = if limit.fetch < 0 {
719 None
720 } else {
721 Some(limit.fetch as usize)
722 };
723
724 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
725 }
726 LogicalPlanType::Join(join) => {
727 let left_keys: Vec<Expr> =
728 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
729 let right_keys: Vec<Expr> =
730 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
731 let join_type =
732 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
733 proto_error(format!(
734 "Received a JoinNode message with unknown JoinType {}",
735 join.join_type
736 ))
737 })?;
738 let join_constraint = protobuf::JoinConstraint::try_from(
739 join.join_constraint,
740 )
741 .map_err(|_| {
742 proto_error(format!(
743 "Received a JoinNode message with unknown JoinConstraint {}",
744 join.join_constraint
745 ))
746 })?;
747 let filter: Option<Expr> = join
748 .filter
749 .as_ref()
750 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
751 .map_or(Ok(None), |v| v.map(Some))?;
752
753 let builder = LogicalPlanBuilder::from(into_logical_plan!(
754 join.left,
755 ctx,
756 extension_codec
757 )?);
758 let builder = match join_constraint.into() {
759 JoinConstraint::On => builder.join_with_expr_keys(
760 into_logical_plan!(join.right, ctx, extension_codec)?,
761 join_type.into(),
762 (left_keys, right_keys),
763 filter,
764 )?,
765 JoinConstraint::Using => {
766 let using_keys = left_keys
768 .into_iter()
769 .map(|key| {
770 key.try_as_col().cloned()
771 .ok_or_else(|| internal_datafusion_err!(
772 "Using join keys must be column references, got: {key:?}"
773 ))
774 })
775 .collect::<Result<Vec<_>, _>>()?;
776 builder.join_using(
777 into_logical_plan!(join.right, ctx, extension_codec)?,
778 join_type.into(),
779 using_keys,
780 )?
781 }
782 };
783
784 builder.build()
785 }
786 LogicalPlanType::Union(union) => {
787 if union.inputs.len() < 2 {
788 return Err( DataFusionError::Internal(String::from(
789 "Protobuf deserialization error, Union was require at least two input.",
790 )));
791 }
792 let (first, rest) = union.inputs.split_first().unwrap();
793 let mut builder = LogicalPlanBuilder::from(
794 first.try_into_logical_plan(ctx, extension_codec)?,
795 );
796
797 for i in rest {
798 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
799 builder = builder.union(plan)?;
800 }
801 builder.build()
802 }
803 LogicalPlanType::CrossJoin(crossjoin) => {
804 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
805 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
806
807 LogicalPlanBuilder::from(left).cross_join(right)?.build()
808 }
809 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
810 let input_plans: Vec<LogicalPlan> = inputs
811 .iter()
812 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
813 .collect::<Result<_>>()?;
814
815 let extension_node =
816 extension_codec.try_decode(node, &input_plans, ctx)?;
817 Ok(LogicalPlan::Extension(extension_node))
818 }
819 LogicalPlanType::Distinct(distinct) => {
820 let input: LogicalPlan =
821 into_logical_plan!(distinct.input, ctx, extension_codec)?;
822 LogicalPlanBuilder::from(input).distinct()?.build()
823 }
824 LogicalPlanType::DistinctOn(distinct_on) => {
825 let input: LogicalPlan =
826 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
827 let on_expr =
828 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
829 let select_expr = from_proto::parse_exprs(
830 &distinct_on.select_expr,
831 ctx,
832 extension_codec,
833 )?;
834 let sort_expr = match distinct_on.sort_expr.len() {
835 0 => None,
836 _ => Some(from_proto::parse_sorts(
837 &distinct_on.sort_expr,
838 ctx,
839 extension_codec,
840 )?),
841 };
842 LogicalPlanBuilder::from(input)
843 .distinct_on(on_expr, select_expr, sort_expr)?
844 .build()
845 }
846 LogicalPlanType::ViewScan(scan) => {
847 let schema: Schema = convert_required!(scan.schema)?;
848
849 let mut projection = None;
850 if let Some(columns) = &scan.projection {
851 let column_indices = columns
852 .columns
853 .iter()
854 .map(|name| schema.index_of(name))
855 .collect::<Result<Vec<usize>, _>>()?;
856 projection = Some(column_indices);
857 }
858
859 let input: LogicalPlan =
860 into_logical_plan!(scan.input, ctx, extension_codec)?;
861
862 let definition = if !scan.definition.is_empty() {
863 Some(scan.definition.clone())
864 } else {
865 None
866 };
867
868 let provider = ViewTable::new(input, definition);
869
870 let table_name =
871 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
872
873 LogicalPlanBuilder::scan(
874 table_name,
875 provider_as_source(Arc::new(provider)),
876 projection,
877 )?
878 .build()
879 }
880 LogicalPlanType::Prepare(prepare) => {
881 let input: LogicalPlan =
882 into_logical_plan!(prepare.input, ctx, extension_codec)?;
883 let data_types: Vec<DataType> = prepare
884 .data_types
885 .iter()
886 .map(DataType::try_from)
887 .collect::<Result<_, _>>()?;
888 LogicalPlanBuilder::from(input)
889 .prepare(prepare.name.clone(), data_types)?
890 .build()
891 }
892 LogicalPlanType::DropView(dropview) => {
893 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
894 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
895 if_exists: dropview.if_exists,
896 schema: Arc::new(convert_required!(dropview.schema)?),
897 })))
898 }
899 LogicalPlanType::CopyTo(copy) => {
900 let input: LogicalPlan =
901 into_logical_plan!(copy.input, ctx, extension_codec)?;
902
903 let file_type: Arc<dyn FileType> = format_as_file_type(
904 extension_codec.try_decode_file_format(©.file_type, ctx)?,
905 );
906
907 Ok(LogicalPlan::Copy(dml::CopyTo::new(
908 Arc::new(input),
909 copy.output_url.clone(),
910 copy.partition_by.clone(),
911 file_type,
912 Default::default(),
913 )))
914 }
915 LogicalPlanType::Unnest(unnest) => {
916 let input: LogicalPlan =
917 into_logical_plan!(unnest.input, ctx, extension_codec)?;
918
919 LogicalPlanBuilder::from(input)
920 .unnest_columns_with_options(
921 unnest.exec_columns.iter().map(|c| c.into()).collect(),
922 into_required!(unnest.options)?,
923 )?
924 .build()
925 }
926 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
927 let static_term = recursive_query_node
928 .static_term
929 .as_ref()
930 .ok_or_else(|| DataFusionError::Internal(String::from(
931 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term.",
932 )))?
933 .try_into_logical_plan(ctx, extension_codec)?;
934
935 let recursive_term = recursive_query_node
936 .recursive_term
937 .as_ref()
938 .ok_or_else(|| DataFusionError::Internal(String::from(
939 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term.",
940 )))?
941 .try_into_logical_plan(ctx, extension_codec)?;
942
943 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
944 name: recursive_query_node.name.clone(),
945 static_term: Arc::new(static_term),
946 recursive_term: Arc::new(recursive_term),
947 is_distinct: recursive_query_node.is_distinct,
948 }))
949 }
950 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
951 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
952 let schema = convert_required!(*schema)?;
953 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
954 LogicalPlanBuilder::scan(
955 name.as_str(),
956 provider_as_source(Arc::new(cte_work_table)),
957 None,
958 )?
959 .build()
960 }
961 LogicalPlanType::Dml(dml_node) => Ok(LogicalPlan::Dml(
962 datafusion::logical_expr::DmlStatement::new(
963 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
964 to_table_source(&dml_node.target, ctx, extension_codec)?,
965 dml_node.dml_type().into(),
966 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
967 ),
968 )),
969 }
970 }
971
972 fn try_from_logical_plan(
973 plan: &LogicalPlan,
974 extension_codec: &dyn LogicalExtensionCodec,
975 ) -> Result<Self>
976 where
977 Self: Sized,
978 {
979 match plan {
980 LogicalPlan::Values(Values { values, .. }) => {
981 let n_cols = if values.is_empty() {
982 0
983 } else {
984 values[0].len()
985 } as u64;
986 let values_list =
987 serialize_exprs(values.iter().flatten(), extension_codec)?;
988 Ok(LogicalPlanNode {
989 logical_plan_type: Some(LogicalPlanType::Values(
990 protobuf::ValuesNode {
991 n_cols,
992 values_list,
993 },
994 )),
995 })
996 }
997 LogicalPlan::TableScan(TableScan {
998 table_name,
999 source,
1000 filters,
1001 projection,
1002 ..
1003 }) => {
1004 let provider = source_as_provider(source)?;
1005 let schema = provider.schema();
1006 let source = provider.as_any();
1007
1008 let projection = match projection {
1009 None => None,
1010 Some(columns) => {
1011 let column_names = columns
1012 .iter()
1013 .map(|i| schema.field(*i).name().to_owned())
1014 .collect();
1015 Some(protobuf::ProjectionColumns {
1016 columns: column_names,
1017 })
1018 }
1019 };
1020
1021 let filters: Vec<protobuf::LogicalExprNode> =
1022 serialize_exprs(filters, extension_codec)?;
1023
1024 if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
1025 let any = listing_table.options().format.as_any();
1026 let file_format_type = {
1027 let mut maybe_some_type = None;
1028
1029 #[cfg(feature = "parquet")]
1030 if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
1031 let options = parquet.options();
1032 maybe_some_type =
1033 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1034 options: Some(options.try_into()?),
1035 }));
1036 };
1037
1038 if let Some(csv) = any.downcast_ref::<CsvFormat>() {
1039 let options = csv.options();
1040 maybe_some_type =
1041 Some(FileFormatType::Csv(protobuf::CsvFormat {
1042 options: Some(options.try_into()?),
1043 }));
1044 }
1045
1046 if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
1047 let options = json.options();
1048 maybe_some_type =
1049 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1050 options: Some(options.try_into()?),
1051 }))
1052 }
1053
1054 #[cfg(feature = "avro")]
1055 if any.is::<AvroFormat>() {
1056 maybe_some_type =
1057 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1058 }
1059
1060 if let Some(file_format_type) = maybe_some_type {
1061 file_format_type
1062 } else {
1063 return Err(proto_error(format!(
1064 "Error converting file format, {:?} is invalid as a datafusion format.",
1065 listing_table.options().format
1066 )));
1067 }
1068 };
1069
1070 let options = listing_table.options();
1071
1072 let mut builder = SchemaBuilder::from(schema.as_ref());
1073 for (idx, field) in schema.fields().iter().enumerate().rev() {
1074 if options
1075 .table_partition_cols
1076 .iter()
1077 .any(|(name, _)| name == field.name())
1078 {
1079 builder.remove(idx);
1080 }
1081 }
1082
1083 let schema = builder.finish();
1084
1085 let schema: protobuf::Schema = (&schema).try_into()?;
1086
1087 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1088 for order in &options.file_sort_order {
1089 let expr_vec = SortExprNodeCollection {
1090 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1091 };
1092 exprs_vec.push(expr_vec);
1093 }
1094
1095 let partition_columns = options
1096 .table_partition_cols
1097 .iter()
1098 .map(|(name, arrow_type)| {
1099 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1100 .map_err(|e| {
1101 proto_error(format!(
1102 "Received an unknown ArrowType: {e}"
1103 ))
1104 })?;
1105 Ok(protobuf::PartitionColumn {
1106 name: name.clone(),
1107 arrow_type: Some(arrow_type),
1108 })
1109 })
1110 .collect::<Result<Vec<_>>>()?;
1111
1112 Ok(LogicalPlanNode {
1113 logical_plan_type: Some(LogicalPlanType::ListingScan(
1114 protobuf::ListingTableScanNode {
1115 file_format_type: Some(file_format_type),
1116 table_name: Some(table_name.clone().into()),
1117 collect_stat: options.collect_stat,
1118 file_extension: options.file_extension.clone(),
1119 table_partition_cols: partition_columns,
1120 paths: listing_table
1121 .table_paths()
1122 .iter()
1123 .map(|x| x.to_string())
1124 .collect(),
1125 schema: Some(schema),
1126 projection,
1127 filters,
1128 target_partitions: options.target_partitions as u32,
1129 file_sort_order: exprs_vec,
1130 },
1131 )),
1132 })
1133 } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
1134 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1135 Ok(LogicalPlanNode {
1136 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1137 protobuf::ViewTableScanNode {
1138 table_name: Some(table_name.clone().into()),
1139 input: Some(Box::new(
1140 LogicalPlanNode::try_from_logical_plan(
1141 view_table.logical_plan(),
1142 extension_codec,
1143 )?,
1144 )),
1145 schema: Some(schema),
1146 projection,
1147 definition: view_table
1148 .definition()
1149 .map(|s| s.to_string())
1150 .unwrap_or_default(),
1151 },
1152 ))),
1153 })
1154 } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
1155 {
1156 let name = cte_work_table.name().to_string();
1157 let schema = cte_work_table.schema();
1158 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1159
1160 Ok(LogicalPlanNode {
1161 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1162 protobuf::CteWorkTableScanNode {
1163 name,
1164 schema: Some(schema),
1165 },
1166 )),
1167 })
1168 } else {
1169 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1170 let mut bytes = vec![];
1171 extension_codec
1172 .try_encode_table_provider(table_name, provider, &mut bytes)
1173 .map_err(|e| context!("Error serializing custom table", e))?;
1174 let scan = CustomScan(CustomTableScanNode {
1175 table_name: Some(table_name.clone().into()),
1176 projection,
1177 schema: Some(schema),
1178 filters,
1179 custom_table_data: bytes,
1180 });
1181 let node = LogicalPlanNode {
1182 logical_plan_type: Some(scan),
1183 };
1184 Ok(node)
1185 }
1186 }
1187 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1188 Ok(LogicalPlanNode {
1189 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1190 protobuf::ProjectionNode {
1191 input: Some(Box::new(
1192 LogicalPlanNode::try_from_logical_plan(
1193 input.as_ref(),
1194 extension_codec,
1195 )?,
1196 )),
1197 expr: serialize_exprs(expr, extension_codec)?,
1198 optional_alias: None,
1199 },
1200 ))),
1201 })
1202 }
1203 LogicalPlan::Filter(filter) => {
1204 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1205 filter.input.as_ref(),
1206 extension_codec,
1207 )?;
1208 Ok(LogicalPlanNode {
1209 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1210 protobuf::SelectionNode {
1211 input: Some(Box::new(input)),
1212 expr: Some(serialize_expr(
1213 &filter.predicate,
1214 extension_codec,
1215 )?),
1216 },
1217 ))),
1218 })
1219 }
1220 LogicalPlan::Distinct(Distinct::All(input)) => {
1221 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1222 input.as_ref(),
1223 extension_codec,
1224 )?;
1225 Ok(LogicalPlanNode {
1226 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1227 protobuf::DistinctNode {
1228 input: Some(Box::new(input)),
1229 },
1230 ))),
1231 })
1232 }
1233 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1234 on_expr,
1235 select_expr,
1236 sort_expr,
1237 input,
1238 ..
1239 })) => {
1240 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1241 input.as_ref(),
1242 extension_codec,
1243 )?;
1244 let sort_expr = match sort_expr {
1245 None => vec![],
1246 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1247 };
1248 Ok(LogicalPlanNode {
1249 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1250 protobuf::DistinctOnNode {
1251 on_expr: serialize_exprs(on_expr, extension_codec)?,
1252 select_expr: serialize_exprs(select_expr, extension_codec)?,
1253 sort_expr,
1254 input: Some(Box::new(input)),
1255 },
1256 ))),
1257 })
1258 }
1259 LogicalPlan::Window(Window {
1260 input, window_expr, ..
1261 }) => {
1262 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1263 input.as_ref(),
1264 extension_codec,
1265 )?;
1266 Ok(LogicalPlanNode {
1267 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1268 protobuf::WindowNode {
1269 input: Some(Box::new(input)),
1270 window_expr: serialize_exprs(window_expr, extension_codec)?,
1271 },
1272 ))),
1273 })
1274 }
1275 LogicalPlan::Aggregate(Aggregate {
1276 group_expr,
1277 aggr_expr,
1278 input,
1279 ..
1280 }) => {
1281 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1282 input.as_ref(),
1283 extension_codec,
1284 )?;
1285 Ok(LogicalPlanNode {
1286 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1287 protobuf::AggregateNode {
1288 input: Some(Box::new(input)),
1289 group_expr: serialize_exprs(group_expr, extension_codec)?,
1290 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1291 },
1292 ))),
1293 })
1294 }
1295 LogicalPlan::Join(Join {
1296 left,
1297 right,
1298 on,
1299 filter,
1300 join_type,
1301 join_constraint,
1302 null_equality,
1303 ..
1304 }) => {
1305 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1306 left.as_ref(),
1307 extension_codec,
1308 )?;
1309 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1310 right.as_ref(),
1311 extension_codec,
1312 )?;
1313 let (left_join_key, right_join_key) = on
1314 .iter()
1315 .map(|(l, r)| {
1316 Ok((
1317 serialize_expr(l, extension_codec)?,
1318 serialize_expr(r, extension_codec)?,
1319 ))
1320 })
1321 .collect::<Result<Vec<_>, ToProtoError>>()?
1322 .into_iter()
1323 .unzip();
1324 let join_type: protobuf::JoinType = join_type.to_owned().into();
1325 let join_constraint: protobuf::JoinConstraint =
1326 join_constraint.to_owned().into();
1327 let null_equality: protobuf::NullEquality =
1328 null_equality.to_owned().into();
1329 let filter = filter
1330 .as_ref()
1331 .map(|e| serialize_expr(e, extension_codec))
1332 .map_or(Ok(None), |v| v.map(Some))?;
1333 Ok(LogicalPlanNode {
1334 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1335 protobuf::JoinNode {
1336 left: Some(Box::new(left)),
1337 right: Some(Box::new(right)),
1338 join_type: join_type.into(),
1339 join_constraint: join_constraint.into(),
1340 left_join_key,
1341 right_join_key,
1342 null_equality: null_equality.into(),
1343 filter,
1344 },
1345 ))),
1346 })
1347 }
1348 LogicalPlan::Subquery(_) => {
1349 not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
1350 }
1351 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1352 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1353 input.as_ref(),
1354 extension_codec,
1355 )?;
1356 Ok(LogicalPlanNode {
1357 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1358 protobuf::SubqueryAliasNode {
1359 input: Some(Box::new(input)),
1360 alias: Some((*alias).clone().into()),
1361 },
1362 ))),
1363 })
1364 }
1365 LogicalPlan::Limit(limit) => {
1366 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1367 limit.input.as_ref(),
1368 extension_codec,
1369 )?;
1370 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1371 return Err(proto_error(
1372 "LogicalPlan::Limit only supports literal skip values",
1373 ));
1374 };
1375 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1376 return Err(proto_error(
1377 "LogicalPlan::Limit only supports literal fetch values",
1378 ));
1379 };
1380
1381 Ok(LogicalPlanNode {
1382 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1383 protobuf::LimitNode {
1384 input: Some(Box::new(input)),
1385 skip: skip as i64,
1386 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1387 },
1388 ))),
1389 })
1390 }
1391 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1392 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1393 input.as_ref(),
1394 extension_codec,
1395 )?;
1396 let sort_expr: Vec<protobuf::SortExprNode> =
1397 serialize_sorts(expr, extension_codec)?;
1398 Ok(LogicalPlanNode {
1399 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1400 protobuf::SortNode {
1401 input: Some(Box::new(input)),
1402 expr: sort_expr,
1403 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1404 },
1405 ))),
1406 })
1407 }
1408 LogicalPlan::Repartition(Repartition {
1409 input,
1410 partitioning_scheme,
1411 }) => {
1412 use datafusion::logical_expr::Partitioning;
1413 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414 input.as_ref(),
1415 extension_codec,
1416 )?;
1417
1418 use protobuf::repartition_node::PartitionMethod;
1421
1422 let pb_partition_method = match partitioning_scheme {
1423 Partitioning::Hash(exprs, partition_count) => {
1424 PartitionMethod::Hash(protobuf::HashRepartition {
1425 hash_expr: serialize_exprs(exprs, extension_codec)?,
1426 partition_count: *partition_count as u64,
1427 })
1428 }
1429 Partitioning::RoundRobinBatch(partition_count) => {
1430 PartitionMethod::RoundRobin(*partition_count as u64)
1431 }
1432 Partitioning::DistributeBy(_) => {
1433 return not_impl_err!("DistributeBy")
1434 }
1435 };
1436
1437 Ok(LogicalPlanNode {
1438 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1439 protobuf::RepartitionNode {
1440 input: Some(Box::new(input)),
1441 partition_method: Some(pb_partition_method),
1442 },
1443 ))),
1444 })
1445 }
1446 LogicalPlan::EmptyRelation(EmptyRelation {
1447 produce_one_row, ..
1448 }) => Ok(LogicalPlanNode {
1449 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1450 protobuf::EmptyRelationNode {
1451 produce_one_row: *produce_one_row,
1452 },
1453 )),
1454 }),
1455 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1456 CreateExternalTable {
1457 name,
1458 location,
1459 file_type,
1460 schema: df_schema,
1461 table_partition_cols,
1462 if_not_exists,
1463 definition,
1464 order_exprs,
1465 unbounded,
1466 options,
1467 constraints,
1468 column_defaults,
1469 temporary,
1470 },
1471 )) => {
1472 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1473 for order in order_exprs {
1474 let temp = SortExprNodeCollection {
1475 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1476 };
1477 converted_order_exprs.push(temp);
1478 }
1479
1480 let mut converted_column_defaults =
1481 HashMap::with_capacity(column_defaults.len());
1482 for (col_name, expr) in column_defaults {
1483 converted_column_defaults
1484 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1485 }
1486
1487 Ok(LogicalPlanNode {
1488 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1489 protobuf::CreateExternalTableNode {
1490 name: Some(name.clone().into()),
1491 location: location.clone(),
1492 file_type: file_type.clone(),
1493 schema: Some(df_schema.try_into()?),
1494 table_partition_cols: table_partition_cols.clone(),
1495 if_not_exists: *if_not_exists,
1496 temporary: *temporary,
1497 order_exprs: converted_order_exprs,
1498 definition: definition.clone().unwrap_or_default(),
1499 unbounded: *unbounded,
1500 options: options.clone(),
1501 constraints: Some(constraints.clone().into()),
1502 column_defaults: converted_column_defaults,
1503 },
1504 )),
1505 })
1506 }
1507 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1508 name,
1509 input,
1510 or_replace,
1511 definition,
1512 temporary,
1513 })) => Ok(LogicalPlanNode {
1514 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1515 protobuf::CreateViewNode {
1516 name: Some(name.clone().into()),
1517 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1518 input,
1519 extension_codec,
1520 )?)),
1521 or_replace: *or_replace,
1522 temporary: *temporary,
1523 definition: definition.clone().unwrap_or_default(),
1524 },
1525 ))),
1526 }),
1527 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1528 CreateCatalogSchema {
1529 schema_name,
1530 if_not_exists,
1531 schema: df_schema,
1532 },
1533 )) => Ok(LogicalPlanNode {
1534 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1535 protobuf::CreateCatalogSchemaNode {
1536 schema_name: schema_name.clone(),
1537 if_not_exists: *if_not_exists,
1538 schema: Some(df_schema.try_into()?),
1539 },
1540 )),
1541 }),
1542 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1543 catalog_name,
1544 if_not_exists,
1545 schema: df_schema,
1546 })) => Ok(LogicalPlanNode {
1547 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1548 protobuf::CreateCatalogNode {
1549 catalog_name: catalog_name.clone(),
1550 if_not_exists: *if_not_exists,
1551 schema: Some(df_schema.try_into()?),
1552 },
1553 )),
1554 }),
1555 LogicalPlan::Analyze(a) => {
1556 let input = LogicalPlanNode::try_from_logical_plan(
1557 a.input.as_ref(),
1558 extension_codec,
1559 )?;
1560 Ok(LogicalPlanNode {
1561 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1562 protobuf::AnalyzeNode {
1563 input: Some(Box::new(input)),
1564 verbose: a.verbose,
1565 },
1566 ))),
1567 })
1568 }
1569 LogicalPlan::Explain(a) => {
1570 let input = LogicalPlanNode::try_from_logical_plan(
1571 a.plan.as_ref(),
1572 extension_codec,
1573 )?;
1574 Ok(LogicalPlanNode {
1575 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1576 protobuf::ExplainNode {
1577 input: Some(Box::new(input)),
1578 verbose: a.verbose,
1579 },
1580 ))),
1581 })
1582 }
1583 LogicalPlan::Union(union) => {
1584 let inputs: Vec<LogicalPlanNode> = union
1585 .inputs
1586 .iter()
1587 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1588 .collect::<Result<_>>()?;
1589 Ok(LogicalPlanNode {
1590 logical_plan_type: Some(LogicalPlanType::Union(
1591 protobuf::UnionNode { inputs },
1592 )),
1593 })
1594 }
1595 LogicalPlan::Extension(extension) => {
1596 let mut buf: Vec<u8> = vec![];
1597 extension_codec.try_encode(extension, &mut buf)?;
1598
1599 let inputs: Vec<LogicalPlanNode> = extension
1600 .node
1601 .inputs()
1602 .iter()
1603 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1604 .collect::<Result<_>>()?;
1605
1606 Ok(LogicalPlanNode {
1607 logical_plan_type: Some(LogicalPlanType::Extension(
1608 LogicalExtensionNode { node: buf, inputs },
1609 )),
1610 })
1611 }
1612 LogicalPlan::Statement(Statement::Prepare(Prepare {
1613 name,
1614 data_types,
1615 input,
1616 })) => {
1617 let input =
1618 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1619 Ok(LogicalPlanNode {
1620 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1621 protobuf::PrepareNode {
1622 name: name.clone(),
1623 data_types: data_types
1624 .iter()
1625 .map(|t| t.try_into())
1626 .collect::<Result<Vec<_>, _>>()?,
1627 input: Some(Box::new(input)),
1628 },
1629 ))),
1630 })
1631 }
1632 LogicalPlan::Unnest(Unnest {
1633 input,
1634 exec_columns,
1635 list_type_columns,
1636 struct_type_columns,
1637 dependency_indices,
1638 schema,
1639 options,
1640 }) => {
1641 let input =
1642 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1643 let proto_unnest_list_items = list_type_columns
1644 .iter()
1645 .map(|(index, ul)| ColumnUnnestListItem {
1646 input_index: *index as _,
1647 recursion: Some(ColumnUnnestListRecursion {
1648 output_column: Some(ul.output_column.to_owned().into()),
1649 depth: ul.depth as _,
1650 }),
1651 })
1652 .collect();
1653 Ok(LogicalPlanNode {
1654 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1655 protobuf::UnnestNode {
1656 input: Some(Box::new(input)),
1657 exec_columns: exec_columns
1658 .iter()
1659 .map(|col| col.into())
1660 .collect(),
1661 list_type_columns: proto_unnest_list_items,
1662 struct_type_columns: struct_type_columns
1663 .iter()
1664 .map(|c| *c as u64)
1665 .collect(),
1666 dependency_indices: dependency_indices
1667 .iter()
1668 .map(|c| *c as u64)
1669 .collect(),
1670 schema: Some(schema.try_into()?),
1671 options: Some(options.into()),
1672 },
1673 ))),
1674 })
1675 }
1676 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1677 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1678 )),
1679 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1680 "LogicalPlan serde is not yet implemented for CreateIndex",
1681 )),
1682 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1683 "LogicalPlan serde is not yet implemented for DropTable",
1684 )),
1685 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1686 name,
1687 if_exists,
1688 schema,
1689 })) => Ok(LogicalPlanNode {
1690 logical_plan_type: Some(LogicalPlanType::DropView(
1691 protobuf::DropViewNode {
1692 name: Some(name.clone().into()),
1693 if_exists: *if_exists,
1694 schema: Some(schema.try_into()?),
1695 },
1696 )),
1697 }),
1698 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1699 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1700 )),
1701 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1702 "LogicalPlan serde is not yet implemented for CreateFunction",
1703 )),
1704 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1705 "LogicalPlan serde is not yet implemented for DropFunction",
1706 )),
1707 LogicalPlan::Statement(_) => Err(proto_error(
1708 "LogicalPlan serde is not yet implemented for Statement",
1709 )),
1710 LogicalPlan::Dml(DmlStatement {
1711 table_name,
1712 target,
1713 op,
1714 input,
1715 ..
1716 }) => {
1717 let input =
1718 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1719 let dml_type: dml_node::Type = op.into();
1720 Ok(LogicalPlanNode {
1721 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1722 input: Some(Box::new(input)),
1723 target: Some(Box::new(from_table_source(
1724 table_name.clone(),
1725 Arc::clone(target),
1726 extension_codec,
1727 )?)),
1728 table_name: Some(table_name.clone().into()),
1729 dml_type: dml_type.into(),
1730 }))),
1731 })
1732 }
1733 LogicalPlan::Copy(dml::CopyTo {
1734 input,
1735 output_url,
1736 file_type,
1737 partition_by,
1738 ..
1739 }) => {
1740 let input =
1741 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1742 let mut buf = Vec::new();
1743 extension_codec
1744 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1745
1746 Ok(LogicalPlanNode {
1747 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1748 protobuf::CopyToNode {
1749 input: Some(Box::new(input)),
1750 output_url: output_url.to_string(),
1751 file_type: buf,
1752 partition_by: partition_by.clone(),
1753 },
1754 ))),
1755 })
1756 }
1757 LogicalPlan::DescribeTable(_) => Err(proto_error(
1758 "LogicalPlan serde is not yet implemented for DescribeTable",
1759 )),
1760 LogicalPlan::RecursiveQuery(recursive) => {
1761 let static_term = LogicalPlanNode::try_from_logical_plan(
1762 recursive.static_term.as_ref(),
1763 extension_codec,
1764 )?;
1765 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1766 recursive.recursive_term.as_ref(),
1767 extension_codec,
1768 )?;
1769
1770 Ok(LogicalPlanNode {
1771 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1772 protobuf::RecursiveQueryNode {
1773 name: recursive.name.clone(),
1774 static_term: Some(Box::new(static_term)),
1775 recursive_term: Some(Box::new(recursive_term)),
1776 is_distinct: recursive.is_distinct,
1777 },
1778 ))),
1779 })
1780 }
1781 }
1782 }
1783}