1use std::collections::HashMap;
19use std::fmt::Debug;
20use std::sync::Arc;
21
22use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
23use crate::protobuf::{
24 ColumnUnnestListItem, ColumnUnnestListRecursion, CteWorkTableScanNode,
25 CustomTableScanNode, DmlNode, SortExprNodeCollection, dml_node,
26};
27use crate::{
28 convert_required, into_required,
29 protobuf::{
30 self, LogicalExtensionNode, LogicalPlanNode,
31 listing_table_scan_node::FileFormatType, logical_plan_node::LogicalPlanType,
32 },
33};
34
35use crate::protobuf::{ToProtoError, proto_error};
36use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
37use datafusion_catalog::cte_worktable::CteWorkTable;
38use datafusion_catalog::empty::EmptyTable;
39use datafusion_common::file_options::file_type::FileType;
40use datafusion_common::format::ExplainFormat;
41use datafusion_common::{
42 Result, TableReference, ToDFSchema, assert_or_internal_err, context,
43 internal_datafusion_err, internal_err, not_impl_err, plan_err,
44};
45use datafusion_datasource::file_format::FileFormat;
46use datafusion_datasource::file_format::{
47 FileFormatFactory, file_type_to_format, format_as_file_type,
48};
49use datafusion_datasource_arrow::file_format::{ArrowFormat, ArrowFormatFactory};
50#[cfg(feature = "avro")]
51use datafusion_datasource_avro::file_format::AvroFormat;
52use datafusion_datasource_csv::file_format::{CsvFormat, CsvFormatFactory};
53use datafusion_datasource_json::file_format::{
54 JsonFormat as OtherNdJsonFormat, JsonFormatFactory,
55};
56#[cfg(feature = "parquet")]
57use datafusion_datasource_parquet::file_format::{ParquetFormat, ParquetFormatFactory};
58use datafusion_expr::{
59 AggregateUDF, DmlStatement, FetchType, HigherOrderUDF, RecursiveQuery, SkipType,
60 TableSource, Unnest,
61};
62use datafusion_expr::{
63 DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder, ScalarUDF, SortExpr,
64 Statement, WindowUDF, dml,
65 logical_plan::{
66 Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
67 DdlStatement, Distinct, EmptyRelation, Extension, Join, JoinConstraint, Prepare,
68 Projection, Repartition, Sort, SubqueryAlias, TableScan, Values, Window,
69 builder::project,
70 },
71};
72
73use self::to_proto::{serialize_expr, serialize_exprs};
74use crate::logical_plan::to_proto::serialize_sorts;
75use datafusion_catalog::TableProvider;
76use datafusion_catalog::default_table_source::{provider_as_source, source_as_provider};
77use datafusion_catalog::view::ViewTable;
78use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
79use datafusion_datasource::ListingTableUrl;
80use datafusion_execution::TaskContext;
81use prost::Message;
82use prost::bytes::BufMut;
83
84pub mod file_formats;
85pub mod from_proto;
86pub mod to_proto;
87
88pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
89 fn try_decode(buf: &[u8]) -> Result<Self>
90 where
91 Self: Sized;
92
93 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
94 where
95 B: BufMut,
96 Self: Sized;
97
98 fn try_into_logical_plan(
99 &self,
100 ctx: &TaskContext,
101 extension_codec: &dyn LogicalExtensionCodec,
102 ) -> Result<LogicalPlan>;
103
104 fn try_from_logical_plan(
105 plan: &LogicalPlan,
106 extension_codec: &dyn LogicalExtensionCodec,
107 ) -> Result<Self>
108 where
109 Self: Sized;
110}
111
112pub trait LogicalExtensionCodec: Debug + Send + Sync + std::any::Any {
113 fn try_decode(
114 &self,
115 buf: &[u8],
116 inputs: &[LogicalPlan],
117 ctx: &TaskContext,
118 ) -> Result<Extension>;
119
120 fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> Result<()>;
121
122 fn try_decode_table_provider(
123 &self,
124 buf: &[u8],
125 table_ref: &TableReference,
126 schema: SchemaRef,
127 ctx: &TaskContext,
128 ) -> Result<Arc<dyn TableProvider>>;
129
130 fn try_encode_table_provider(
131 &self,
132 table_ref: &TableReference,
133 node: Arc<dyn TableProvider>,
134 buf: &mut Vec<u8>,
135 ) -> Result<()>;
136
137 fn try_decode_file_format(
138 &self,
139 _buf: &[u8],
140 _ctx: &TaskContext,
141 ) -> Result<Arc<dyn FileFormatFactory>> {
142 not_impl_err!("LogicalExtensionCodec is not provided for file format")
143 }
144
145 fn try_encode_file_format(
146 &self,
147 _buf: &mut Vec<u8>,
148 _node: Arc<dyn FileFormatFactory>,
149 ) -> Result<()> {
150 Ok(())
151 }
152
153 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
154 not_impl_err!("LogicalExtensionCodec is not provided for scalar function {name}")
155 }
156
157 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
158 Ok(())
159 }
160
161 fn try_decode_higher_order_function(
162 &self,
163 name: &str,
164 _buf: &[u8],
165 ) -> Result<Arc<HigherOrderUDF>> {
166 not_impl_err!(
167 "LogicalExtensionCodec is not provided for higher order function {name}"
168 )
169 }
170
171 fn try_encode_higher_order_function(
172 &self,
173 _node: &HigherOrderUDF,
174 _buf: &mut Vec<u8>,
175 ) -> Result<()> {
176 Ok(())
177 }
178
179 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
180 not_impl_err!(
181 "LogicalExtensionCodec is not provided for aggregate function {name}"
182 )
183 }
184
185 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
186 Ok(())
187 }
188
189 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
190 not_impl_err!("LogicalExtensionCodec is not provided for window function {name}")
191 }
192
193 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
194 Ok(())
195 }
196}
197
198#[derive(Debug, Clone)]
199pub struct DefaultLogicalExtensionCodec {}
200
201impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
202 fn try_decode(
203 &self,
204 _buf: &[u8],
205 _inputs: &[LogicalPlan],
206 _ctx: &TaskContext,
207 ) -> Result<Extension> {
208 not_impl_err!("LogicalExtensionCodec is not provided")
209 }
210
211 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
212 not_impl_err!("LogicalExtensionCodec is not provided")
213 }
214
215 fn try_decode_table_provider(
216 &self,
217 _buf: &[u8],
218 _table_ref: &TableReference,
219 _schema: SchemaRef,
220 _ctx: &TaskContext,
221 ) -> Result<Arc<dyn TableProvider>> {
222 not_impl_err!("LogicalExtensionCodec is not provided")
223 }
224
225 fn try_encode_table_provider(
226 &self,
227 _table_ref: &TableReference,
228 _node: Arc<dyn TableProvider>,
229 _buf: &mut Vec<u8>,
230 ) -> Result<()> {
231 not_impl_err!("LogicalExtensionCodec is not provided")
232 }
233
234 fn try_decode_file_format(
235 &self,
236 buf: &[u8],
237 ctx: &TaskContext,
238 ) -> Result<Arc<dyn FileFormatFactory>> {
239 let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
240 internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
241 })?;
242
243 let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| {
244 internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind)
245 })?;
246
247 match kind {
248 protobuf::FileFormatKind::Csv => file_formats::CsvLogicalExtensionCodec
249 .try_decode_file_format(&proto.encoded_file_format, ctx),
250 protobuf::FileFormatKind::Json => file_formats::JsonLogicalExtensionCodec
251 .try_decode_file_format(&proto.encoded_file_format, ctx),
252 #[cfg(feature = "parquet")]
253 protobuf::FileFormatKind::Parquet => {
254 file_formats::ParquetLogicalExtensionCodec
255 .try_decode_file_format(&proto.encoded_file_format, ctx)
256 }
257 protobuf::FileFormatKind::Arrow => file_formats::ArrowLogicalExtensionCodec
258 .try_decode_file_format(&proto.encoded_file_format, ctx),
259 protobuf::FileFormatKind::Avro => file_formats::AvroLogicalExtensionCodec
260 .try_decode_file_format(&proto.encoded_file_format, ctx),
261 #[cfg(not(feature = "parquet"))]
262 protobuf::FileFormatKind::Parquet => {
263 not_impl_err!("Parquet support requires the 'parquet' feature")
264 }
265 protobuf::FileFormatKind::Unspecified => {
266 not_impl_err!("Unspecified file format kind")
267 }
268 }
269 }
270
271 fn try_encode_file_format(
272 &self,
273 buf: &mut Vec<u8>,
274 node: Arc<dyn FileFormatFactory>,
275 ) -> Result<()> {
276 let mut encoded_file_format = Vec::new();
277
278 let kind = if node.downcast_ref::<CsvFormatFactory>().is_some() {
279 file_formats::CsvLogicalExtensionCodec
280 .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
281 protobuf::FileFormatKind::Csv
282 } else if node.downcast_ref::<JsonFormatFactory>().is_some() {
283 file_formats::JsonLogicalExtensionCodec
284 .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
285 protobuf::FileFormatKind::Json
286 } else if node.downcast_ref::<ArrowFormatFactory>().is_some() {
287 file_formats::ArrowLogicalExtensionCodec
288 .try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
289 protobuf::FileFormatKind::Arrow
290 } else {
291 #[cfg(feature = "parquet")]
292 {
293 if node.downcast_ref::<ParquetFormatFactory>().is_some() {
294 file_formats::ParquetLogicalExtensionCodec.try_encode_file_format(
295 &mut encoded_file_format,
296 Arc::clone(&node),
297 )?;
298 protobuf::FileFormatKind::Parquet
299 } else {
300 return not_impl_err!(
301 "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec"
302 );
303 }
304 }
305 #[cfg(not(feature = "parquet"))]
306 {
307 return not_impl_err!(
308 "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec"
309 );
310 }
311 };
312
313 let proto = protobuf::FileFormatProto {
314 kind: kind as i32,
315 encoded_file_format,
316 };
317 proto.encode(buf).map_err(|e| {
318 internal_datafusion_err!("Failed to encode FileFormatProto: {e}")
319 })?;
320 Ok(())
321 }
322}
323
324#[macro_export]
325macro_rules! into_logical_plan {
326 ($PB:expr, $CTX:expr, $CODEC:expr) => {{
327 if let Some(field) = $PB.as_ref() {
328 field.as_ref().try_into_logical_plan($CTX, $CODEC)
329 } else {
330 Err(proto_error("Missing required field in protobuf"))
331 }
332 }};
333}
334
335fn from_table_reference(
336 table_ref: Option<&protobuf::TableReference>,
337 error_context: &str,
338) -> Result<TableReference> {
339 let table_ref = table_ref.ok_or_else(|| {
340 internal_datafusion_err!(
341 "Protobuf deserialization error, {error_context} was missing required field name."
342 )
343 })?;
344
345 Ok(table_ref.clone().try_into()?)
346}
347
348fn to_table_source(
352 node: &Option<Box<LogicalPlanNode>>,
353 ctx: &TaskContext,
354 extension_codec: &dyn LogicalExtensionCodec,
355) -> Result<Arc<dyn TableSource>> {
356 if let Some(node) = node {
357 match node.try_into_logical_plan(ctx, extension_codec)? {
358 LogicalPlan::TableScan(TableScan { source, .. }) => Ok(source),
359 _ => plan_err!("expected TableScan node"),
360 }
361 } else {
362 plan_err!("LogicalPlanNode should be provided")
363 }
364}
365
366fn from_table_source(
370 table_name: TableReference,
371 target: Arc<dyn TableSource>,
372 extension_codec: &dyn LogicalExtensionCodec,
373) -> Result<LogicalPlanNode> {
374 let projected_schema = target.schema().to_dfschema_ref()?;
375 let r = LogicalPlan::TableScan(TableScan {
376 table_name,
377 source: target,
378 projection: None,
379 projected_schema,
380 filters: vec![],
381 fetch: None,
382 });
383
384 LogicalPlanNode::try_from_logical_plan(&r, extension_codec)
385}
386
387impl AsLogicalPlan for LogicalPlanNode {
388 fn try_decode(buf: &[u8]) -> Result<Self>
389 where
390 Self: Sized,
391 {
392 LogicalPlanNode::decode(buf)
393 .map_err(|e| internal_datafusion_err!("failed to decode logical plan: {e:?}"))
394 }
395
396 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
397 where
398 B: BufMut,
399 Self: Sized,
400 {
401 self.encode(buf)
402 .map_err(|e| internal_datafusion_err!("failed to encode logical plan: {e:?}"))
403 }
404
405 fn try_into_logical_plan(
406 &self,
407 ctx: &TaskContext,
408 extension_codec: &dyn LogicalExtensionCodec,
409 ) -> Result<LogicalPlan> {
410 let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
411 proto_error(format!(
412 "logical_plan::from_proto() Unsupported logical plan '{self:?}'"
413 ))
414 })?;
415 match plan {
416 LogicalPlanType::Values(values) => {
417 let n_cols = values.n_cols as usize;
418 let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
419 Ok(Vec::new())
420 } else if values.values_list.len() % n_cols != 0 {
421 internal_err!(
422 "Invalid values list length, expect {} to be divisible by {}",
423 values.values_list.len(),
424 n_cols
425 )
426 } else {
427 values
428 .values_list
429 .chunks_exact(n_cols)
430 .map(|r| from_proto::parse_exprs(r, ctx, extension_codec))
431 .collect::<Result<Vec<_>, _>>()
432 .map_err(|e| e.into())
433 }?;
434
435 LogicalPlanBuilder::values(values)?.build()
436 }
437 LogicalPlanType::Projection(projection) => {
438 let input: LogicalPlan =
439 into_logical_plan!(projection.input, ctx, extension_codec)?;
440 let expr: Vec<Expr> =
441 from_proto::parse_exprs(&projection.expr, ctx, extension_codec)?;
442
443 let new_proj = project(input, expr)?;
444 match projection.optional_alias.as_ref() {
445 Some(a) => match a {
446 protobuf::projection_node::OptionalAlias::Alias(alias) => {
447 Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
448 Arc::new(new_proj),
449 alias.clone(),
450 )?))
451 }
452 },
453 _ => Ok(new_proj),
454 }
455 }
456 LogicalPlanType::Selection(selection) => {
457 let input: LogicalPlan =
458 into_logical_plan!(selection.input, ctx, extension_codec)?;
459 let expr: Expr = selection
460 .expr
461 .as_ref()
462 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
463 .transpose()?
464 .ok_or_else(|| proto_error("expression required"))?;
465 LogicalPlanBuilder::from(input).filter(expr)?.build()
466 }
467 LogicalPlanType::Window(window) => {
468 let input: LogicalPlan =
469 into_logical_plan!(window.input, ctx, extension_codec)?;
470 let window_expr =
471 from_proto::parse_exprs(&window.window_expr, ctx, extension_codec)?;
472 LogicalPlanBuilder::from(input).window(window_expr)?.build()
473 }
474 LogicalPlanType::Aggregate(aggregate) => {
475 let input: LogicalPlan =
476 into_logical_plan!(aggregate.input, ctx, extension_codec)?;
477 let group_expr =
478 from_proto::parse_exprs(&aggregate.group_expr, ctx, extension_codec)?;
479 let aggr_expr =
480 from_proto::parse_exprs(&aggregate.aggr_expr, ctx, extension_codec)?;
481 LogicalPlanBuilder::from(input)
482 .aggregate(group_expr, aggr_expr)?
483 .build()
484 }
485 LogicalPlanType::ListingScan(scan) => {
486 let schema: Schema = convert_required!(scan.schema)?;
487
488 let filters =
489 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
490
491 let mut all_sort_orders = vec![];
492 for order in &scan.file_sort_order {
493 all_sort_orders.push(from_proto::parse_sorts(
494 &order.sort_expr_nodes,
495 ctx,
496 extension_codec,
497 )?)
498 }
499
500 let file_format: Arc<dyn FileFormat> =
501 match scan.file_format_type.as_ref().ok_or_else(|| {
502 proto_error(format!(
503 "logical_plan::from_proto() Unsupported file format '{self:?}'"
504 ))
505 })? {
506 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
507 FileFormatType::Parquet(protobuf::ParquetFormat {options}) => {
508 #[cfg(feature = "parquet")]
509 {
510 let mut parquet = ParquetFormat::default();
511 if let Some(options) = options {
512 parquet = parquet.with_options(options.try_into()?)
513 }
514 Arc::new(parquet)
515 }
516 #[cfg(not(feature = "parquet"))]
517 panic!("Unable to process parquet file since `parquet` feature is not enabled");
518 }
519 FileFormatType::Csv(protobuf::CsvFormat {
520 options
521 }) => {
522 let mut csv = CsvFormat::default();
523 if let Some(options) = options {
524 csv = csv.with_options(options.try_into()?)
525 }
526 Arc::new(csv)
527 },
528 FileFormatType::Json(protobuf::NdJsonFormat {
529 options
530 }) => {
531 let mut json = OtherNdJsonFormat::default();
532 if let Some(options) = options {
533 json = json.with_options(options.try_into()?)
534 }
535 Arc::new(json)
536 }
537 FileFormatType::Avro(..) => {
538 #[cfg(feature = "avro")]
539 {
540 Arc::new(AvroFormat)
541 }
542 #[cfg(not(feature = "avro"))]
543 {
544 panic!(
545 "Unable to process avro file since `avro` feature is not enabled"
546 );
547 }
548 }
549 FileFormatType::Arrow(..) => {
550 Arc::new(ArrowFormat)
551 }
552 };
553
554 let table_paths = &scan
555 .paths
556 .iter()
557 .map(ListingTableUrl::parse)
558 .collect::<Result<Vec<_>, _>>()?;
559
560 let partition_columns = scan
561 .table_partition_cols
562 .iter()
563 .map(|col| {
564 let Some(arrow_type) = col.arrow_type.as_ref() else {
565 return Err(proto_error(
566 "Missing Arrow type in partition columns",
567 ));
568 };
569 let arrow_type = DataType::try_from(arrow_type).map_err(|e| {
570 proto_error(format!("Received an unknown ArrowType: {e}"))
571 })?;
572 Ok((col.name.clone(), arrow_type))
573 })
574 .collect::<Result<Vec<_>>>()?;
575
576 let options = ListingOptions::new(file_format)
577 .with_file_extension(&scan.file_extension)
578 .with_table_partition_cols(partition_columns)
579 .with_collect_stat(scan.collect_stat)
580 .with_target_partitions(scan.target_partitions as usize)
581 .with_file_sort_order(all_sort_orders);
582
583 let config =
584 ListingTableConfig::new_with_multi_paths(table_paths.clone())
585 .with_listing_options(options)
586 .with_schema(Arc::new(schema));
587
588 let provider = ListingTable::try_new(config)?.with_cache(
589 ctx.runtime_env().cache_manager.get_file_statistic_cache(),
590 );
591
592 let table_name =
593 from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
594
595 let mut projection = None;
596 if let Some(columns) = &scan.projection {
597 let column_indices = columns
598 .columns
599 .iter()
600 .map(|name| provider.schema().index_of(name))
601 .collect::<Result<Vec<usize>, _>>()?;
602 projection = Some(column_indices);
603 }
604
605 LogicalPlanBuilder::scan_with_filters(
606 table_name,
607 provider_as_source(Arc::new(provider)),
608 projection,
609 filters,
610 )?
611 .build()
612 }
613 LogicalPlanType::CustomScan(scan) => {
614 let schema: Schema = convert_required!(scan.schema)?;
615 let schema = Arc::new(schema);
616 let mut projection = None;
617 if let Some(columns) = &scan.projection {
618 let column_indices = columns
619 .columns
620 .iter()
621 .map(|name| schema.index_of(name))
622 .collect::<Result<Vec<usize>, _>>()?;
623 projection = Some(column_indices);
624 }
625
626 let filters =
627 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
628
629 let table_name =
630 from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
631
632 let provider = extension_codec.try_decode_table_provider(
633 &scan.custom_table_data,
634 &table_name,
635 schema,
636 ctx,
637 )?;
638
639 LogicalPlanBuilder::scan_with_filters(
640 table_name,
641 provider_as_source(provider),
642 projection,
643 filters,
644 )?
645 .build()
646 }
647 LogicalPlanType::Sort(sort) => {
648 let input: LogicalPlan =
649 into_logical_plan!(sort.input, ctx, extension_codec)?;
650 let sort_expr: Vec<SortExpr> =
651 from_proto::parse_sorts(&sort.expr, ctx, extension_codec)?;
652 let fetch: Option<usize> = sort.fetch.try_into().ok();
653 LogicalPlanBuilder::from(input)
654 .sort_with_limit(sort_expr, fetch)?
655 .build()
656 }
657 LogicalPlanType::Repartition(repartition) => {
658 use datafusion_expr::Partitioning;
659 let input: LogicalPlan =
660 into_logical_plan!(repartition.input, ctx, extension_codec)?;
661 use protobuf::repartition_node::PartitionMethod;
662 let pb_partition_method = repartition.partition_method.as_ref().ok_or_else(|| {
663 internal_datafusion_err!(
664 "Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'"
665 )
666 })?;
667
668 let partitioning_scheme = match pb_partition_method {
669 PartitionMethod::Hash(protobuf::HashRepartition {
670 hash_expr: pb_hash_expr,
671 partition_count,
672 }) => Partitioning::Hash(
673 from_proto::parse_exprs(pb_hash_expr, ctx, extension_codec)?,
674 *partition_count as usize,
675 ),
676 PartitionMethod::RoundRobin(partition_count) => {
677 Partitioning::RoundRobinBatch(*partition_count as usize)
678 }
679 };
680
681 LogicalPlanBuilder::from(input)
682 .repartition(partitioning_scheme)?
683 .build()
684 }
685 LogicalPlanType::EmptyRelation(empty_relation) => {
686 LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
687 }
688 LogicalPlanType::CreateExternalTable(create_extern_table) => {
689 let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
690 internal_datafusion_err!(
691 "Protobuf deserialization error, CreateExternalTableNode was missing required field schema."
692 )
693 })?;
694
695 let constraints = (create_extern_table.constraints.clone()).ok_or_else(|| {
696 internal_datafusion_err!(
697 "Protobuf deserialization error, CreateExternalTableNode was missing required table constraints."
698 )
699 })?;
700 let definition = if !create_extern_table.definition.is_empty() {
701 Some(create_extern_table.definition.clone())
702 } else {
703 None
704 };
705
706 let mut order_exprs = vec![];
707 for expr in &create_extern_table.order_exprs {
708 order_exprs.push(from_proto::parse_sorts(
709 &expr.sort_expr_nodes,
710 ctx,
711 extension_codec,
712 )?);
713 }
714
715 let mut column_defaults =
716 HashMap::with_capacity(create_extern_table.column_defaults.len());
717 for (col_name, expr) in &create_extern_table.column_defaults {
718 let expr = from_proto::parse_expr(expr, ctx, extension_codec)?;
719 column_defaults.insert(col_name.clone(), expr);
720 }
721
722 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
723 CreateExternalTable::builder(
724 from_table_reference(
725 create_extern_table.name.as_ref(),
726 "CreateExternalTable",
727 )?,
728 create_extern_table.location.clone(),
729 create_extern_table.file_type.clone(),
730 pb_schema.try_into()?,
731 )
732 .with_partition_cols(create_extern_table.table_partition_cols.clone())
733 .with_order_exprs(order_exprs)
734 .with_if_not_exists(create_extern_table.if_not_exists)
735 .with_or_replace(create_extern_table.or_replace)
736 .with_temporary(create_extern_table.temporary)
737 .with_definition(definition)
738 .with_unbounded(create_extern_table.unbounded)
739 .with_options(create_extern_table.options.clone())
740 .with_constraints(constraints.into())
741 .with_column_defaults(column_defaults)
742 .build(),
743 )))
744 }
745 LogicalPlanType::CreateView(create_view) => {
746 let plan = create_view
747 .input.clone().ok_or_else(|| internal_datafusion_err!(
748 "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input."
749 ))?
750 .try_into_logical_plan(ctx, extension_codec)?;
751 let definition = if !create_view.definition.is_empty() {
752 Some(create_view.definition.clone())
753 } else {
754 None
755 };
756
757 Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
758 name: from_table_reference(create_view.name.as_ref(), "CreateView")?,
759 temporary: create_view.temporary,
760 input: Arc::new(plan),
761 or_replace: create_view.or_replace,
762 definition,
763 })))
764 }
765 LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
766 let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
767 internal_datafusion_err!(
768 "Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema."
769 )
770 })?;
771
772 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
773 CreateCatalogSchema {
774 schema_name: create_catalog_schema.schema_name.clone(),
775 if_not_exists: create_catalog_schema.if_not_exists,
776 schema: pb_schema.try_into()?,
777 },
778 )))
779 }
780 LogicalPlanType::CreateCatalog(create_catalog) => {
781 let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
782 internal_datafusion_err!(
783 "Protobuf deserialization error, CreateCatalogNode was missing required field schema."
784 )
785 })?;
786
787 Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
788 CreateCatalog {
789 catalog_name: create_catalog.catalog_name.clone(),
790 if_not_exists: create_catalog.if_not_exists,
791 schema: pb_schema.try_into()?,
792 },
793 )))
794 }
795 LogicalPlanType::Analyze(analyze) => {
796 let input: LogicalPlan =
797 into_logical_plan!(analyze.input, ctx, extension_codec)?;
798 LogicalPlanBuilder::from(input)
799 .explain(analyze.verbose, true)?
800 .build()
801 }
802 LogicalPlanType::Explain(explain) => {
803 let input: LogicalPlan =
804 into_logical_plan!(explain.input, ctx, extension_codec)?;
805 let pb_format = protobuf::ExplainFormat::try_from(explain.format)
806 .map_err(|_| {
807 proto_error(format!(
808 "Received an ExplainNode message with unknown ExplainFormat {}",
809 explain.format
810 ))
811 })?;
812 let explain_format = match pb_format {
813 protobuf::ExplainFormat::Indent => ExplainFormat::Indent,
814 protobuf::ExplainFormat::Tree => ExplainFormat::Tree,
815 protobuf::ExplainFormat::Pgjson => ExplainFormat::PostgresJSON,
816 protobuf::ExplainFormat::Graphviz => ExplainFormat::Graphviz,
817 };
818 let explain_option =
819 datafusion_expr::logical_plan::ExplainOption::default()
820 .with_verbose(explain.verbose)
821 .with_format(explain_format);
822 LogicalPlanBuilder::from(input)
823 .explain_option_format(explain_option)?
824 .build()
825 }
826 LogicalPlanType::SubqueryAlias(aliased_relation) => {
827 let input: LogicalPlan =
828 into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
829 let alias = from_table_reference(
830 aliased_relation.alias.as_ref(),
831 "SubqueryAlias",
832 )?;
833 LogicalPlanBuilder::from(input).alias(alias)?.build()
834 }
835 LogicalPlanType::Limit(limit) => {
836 let input: LogicalPlan =
837 into_logical_plan!(limit.input, ctx, extension_codec)?;
838 let skip = limit.skip.max(0) as usize;
839
840 let fetch = if limit.fetch < 0 {
841 None
842 } else {
843 Some(limit.fetch as usize)
844 };
845
846 LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
847 }
848 LogicalPlanType::Join(join) => {
849 let left_keys: Vec<Expr> =
850 from_proto::parse_exprs(&join.left_join_key, ctx, extension_codec)?;
851 let right_keys: Vec<Expr> =
852 from_proto::parse_exprs(&join.right_join_key, ctx, extension_codec)?;
853 let join_type =
854 protobuf::JoinType::try_from(join.join_type).map_err(|_| {
855 proto_error(format!(
856 "Received a JoinNode message with unknown JoinType {}",
857 join.join_type
858 ))
859 })?;
860 let join_constraint = protobuf::JoinConstraint::try_from(
861 join.join_constraint,
862 )
863 .map_err(|_| {
864 proto_error(format!(
865 "Received a JoinNode message with unknown JoinConstraint {}",
866 join.join_constraint
867 ))
868 })?;
869 let filter: Option<Expr> = join
870 .filter
871 .as_ref()
872 .map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
873 .map_or(Ok(None), |v| v.map(Some))?;
874
875 let builder = LogicalPlanBuilder::from(into_logical_plan!(
876 join.left,
877 ctx,
878 extension_codec
879 )?);
880 let builder = match join_constraint.into() {
881 JoinConstraint::On => builder.join_with_expr_keys(
882 into_logical_plan!(join.right, ctx, extension_codec)?,
883 join_type.into(),
884 (left_keys, right_keys),
885 filter,
886 )?,
887 JoinConstraint::Using => {
888 let using_keys = left_keys
890 .into_iter()
891 .map(|key| {
892 key.try_as_col().cloned()
893 .ok_or_else(|| internal_datafusion_err!(
894 "Using join keys must be column references, got: {key:?}"
895 ))
896 })
897 .collect::<Result<Vec<_>, _>>()?;
898 builder.join_using(
899 into_logical_plan!(join.right, ctx, extension_codec)?,
900 join_type.into(),
901 using_keys,
902 )?
903 }
904 };
905
906 builder.build()
907 }
908 LogicalPlanType::Union(union) => {
909 assert_or_internal_err!(
910 union.inputs.len() >= 2,
911 "Protobuf deserialization error, Union requires at least two inputs."
912 );
913 let (first, rest) = union.inputs.split_first().unwrap();
914 let mut builder = LogicalPlanBuilder::from(
915 first.try_into_logical_plan(ctx, extension_codec)?,
916 );
917
918 for i in rest {
919 let plan = i.try_into_logical_plan(ctx, extension_codec)?;
920 builder = builder.union(plan)?;
921 }
922 builder.build()
923 }
924 LogicalPlanType::CrossJoin(crossjoin) => {
925 let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
926 let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
927
928 LogicalPlanBuilder::from(left).cross_join(right)?.build()
929 }
930 LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
931 let input_plans: Vec<LogicalPlan> = inputs
932 .iter()
933 .map(|i| i.try_into_logical_plan(ctx, extension_codec))
934 .collect::<Result<_>>()?;
935
936 let extension_node =
937 extension_codec.try_decode(node, &input_plans, ctx)?;
938 Ok(LogicalPlan::Extension(extension_node))
939 }
940 LogicalPlanType::Distinct(distinct) => {
941 let input: LogicalPlan =
942 into_logical_plan!(distinct.input, ctx, extension_codec)?;
943 LogicalPlanBuilder::from(input).distinct()?.build()
944 }
945 LogicalPlanType::DistinctOn(distinct_on) => {
946 let input: LogicalPlan =
947 into_logical_plan!(distinct_on.input, ctx, extension_codec)?;
948 let on_expr =
949 from_proto::parse_exprs(&distinct_on.on_expr, ctx, extension_codec)?;
950 let select_expr = from_proto::parse_exprs(
951 &distinct_on.select_expr,
952 ctx,
953 extension_codec,
954 )?;
955 let sort_expr = match distinct_on.sort_expr.len() {
956 0 => None,
957 _ => Some(from_proto::parse_sorts(
958 &distinct_on.sort_expr,
959 ctx,
960 extension_codec,
961 )?),
962 };
963 LogicalPlanBuilder::from(input)
964 .distinct_on(on_expr, select_expr, sort_expr)?
965 .build()
966 }
967 LogicalPlanType::ViewScan(scan) => {
968 let schema: Schema = convert_required!(scan.schema)?;
969
970 let mut projection = None;
971 if let Some(columns) = &scan.projection {
972 let column_indices = columns
973 .columns
974 .iter()
975 .map(|name| schema.index_of(name))
976 .collect::<Result<Vec<usize>, _>>()?;
977 projection = Some(column_indices);
978 }
979
980 let input: LogicalPlan =
981 into_logical_plan!(scan.input, ctx, extension_codec)?;
982
983 let definition = if !scan.definition.is_empty() {
984 Some(scan.definition.clone())
985 } else {
986 None
987 };
988
989 let provider = ViewTable::new(input, definition);
990
991 let table_name =
992 from_table_reference(scan.table_name.as_ref(), "ViewScan")?;
993
994 LogicalPlanBuilder::scan(
995 table_name,
996 provider_as_source(Arc::new(provider)),
997 projection,
998 )?
999 .build()
1000 }
1001 LogicalPlanType::Prepare(prepare) => {
1002 let input: LogicalPlan =
1003 into_logical_plan!(prepare.input, ctx, extension_codec)?;
1004 let data_types: Vec<DataType> = prepare
1005 .data_types
1006 .iter()
1007 .map(DataType::try_from)
1008 .collect::<Result<_, _>>()?;
1009 let fields: Vec<Field> = prepare
1010 .fields
1011 .iter()
1012 .map(Field::try_from)
1013 .collect::<Result<_, _>>()?;
1014
1015 if fields.is_empty() {
1019 LogicalPlanBuilder::from(input)
1020 .prepare(
1021 prepare.name.clone(),
1022 data_types
1023 .into_iter()
1024 .map(|dt| Field::new("", dt, true).into())
1025 .collect(),
1026 )?
1027 .build()
1028 } else {
1029 LogicalPlanBuilder::from(input)
1030 .prepare(
1031 prepare.name.clone(),
1032 fields.into_iter().map(|f| f.into()).collect(),
1033 )?
1034 .build()
1035 }
1036 }
1037 LogicalPlanType::DropView(dropview) => {
1038 Ok(LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1039 name: from_table_reference(dropview.name.as_ref(), "DropView")?,
1040 if_exists: dropview.if_exists,
1041 schema: Arc::new(convert_required!(dropview.schema)?),
1042 })))
1043 }
1044 LogicalPlanType::CopyTo(copy) => {
1045 let input: LogicalPlan =
1046 into_logical_plan!(copy.input, ctx, extension_codec)?;
1047
1048 let file_type: Arc<dyn FileType> = format_as_file_type(
1049 extension_codec.try_decode_file_format(©.file_type, ctx)?,
1050 );
1051
1052 Ok(LogicalPlan::Copy(dml::CopyTo::new(
1053 Arc::new(input),
1054 copy.output_url.clone(),
1055 copy.partition_by.clone(),
1056 file_type,
1057 Default::default(),
1058 )))
1059 }
1060 LogicalPlanType::Unnest(unnest) => {
1061 let input: LogicalPlan =
1062 into_logical_plan!(unnest.input, ctx, extension_codec)?;
1063
1064 LogicalPlanBuilder::from(input)
1065 .unnest_columns_with_options(
1066 unnest.exec_columns.iter().map(|c| c.into()).collect(),
1067 into_required!(unnest.options)?,
1068 )?
1069 .build()
1070 }
1071 LogicalPlanType::RecursiveQuery(recursive_query_node) => {
1072 let static_term = recursive_query_node
1073 .static_term
1074 .as_ref()
1075 .ok_or_else(|| internal_datafusion_err!(
1076 "Protobuf deserialization error, RecursiveQueryNode was missing required field static_term."
1077 ))?
1078 .try_into_logical_plan(ctx, extension_codec)?;
1079
1080 let recursive_term = recursive_query_node
1081 .recursive_term
1082 .as_ref()
1083 .ok_or_else(|| internal_datafusion_err!(
1084 "Protobuf deserialization error, RecursiveQueryNode was missing required field recursive_term."
1085 ))?
1086 .try_into_logical_plan(ctx, extension_codec)?;
1087
1088 Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
1089 name: recursive_query_node.name.clone(),
1090 static_term: Arc::new(static_term),
1091 recursive_term: Arc::new(recursive_term),
1092 is_distinct: recursive_query_node.is_distinct,
1093 }))
1094 }
1095 LogicalPlanType::CteWorkTableScan(cte_work_table_scan_node) => {
1096 let CteWorkTableScanNode { name, schema } = cte_work_table_scan_node;
1097 let schema = convert_required!(*schema)?;
1098 let cte_work_table = CteWorkTable::new(name.as_str(), Arc::new(schema));
1099 LogicalPlanBuilder::scan(
1100 name.as_str(),
1101 provider_as_source(Arc::new(cte_work_table)),
1102 None,
1103 )?
1104 .build()
1105 }
1106 LogicalPlanType::EmptyTableScan(scan) => {
1107 let schema: Schema = convert_required!(scan.schema)?;
1108 let schema = Arc::new(schema);
1109 let mut projection = None;
1110 if let Some(columns) = &scan.projection {
1111 let column_indices = columns
1112 .columns
1113 .iter()
1114 .map(|name| schema.index_of(name))
1115 .collect::<Result<Vec<usize>, _>>()?;
1116 projection = Some(column_indices);
1117 }
1118
1119 let filters =
1120 from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
1121
1122 let table_name =
1123 from_table_reference(scan.table_name.as_ref(), "EmptyTableScan")?;
1124
1125 let provider = Arc::new(EmptyTable::new(Arc::clone(&schema)));
1126
1127 LogicalPlanBuilder::scan_with_filters(
1128 table_name,
1129 provider_as_source(provider),
1130 projection,
1131 filters,
1132 )?
1133 .build()
1134 }
1135 LogicalPlanType::Dml(dml_node) => {
1136 Ok(LogicalPlan::Dml(datafusion_expr::DmlStatement::new(
1137 from_table_reference(dml_node.table_name.as_ref(), "DML ")?,
1138 to_table_source(&dml_node.target, ctx, extension_codec)?,
1139 dml_node.dml_type().into(),
1140 Arc::new(into_logical_plan!(dml_node.input, ctx, extension_codec)?),
1141 )))
1142 }
1143 }
1144 }
1145
1146 fn try_from_logical_plan(
1147 plan: &LogicalPlan,
1148 extension_codec: &dyn LogicalExtensionCodec,
1149 ) -> Result<Self>
1150 where
1151 Self: Sized,
1152 {
1153 match plan {
1154 LogicalPlan::Values(Values { values, .. }) => {
1155 let n_cols = if values.is_empty() {
1156 0
1157 } else {
1158 values[0].len()
1159 } as u64;
1160 let values_list =
1161 serialize_exprs(values.iter().flatten(), extension_codec)?;
1162 Ok(LogicalPlanNode {
1163 logical_plan_type: Some(LogicalPlanType::Values(
1164 protobuf::ValuesNode {
1165 n_cols,
1166 values_list,
1167 },
1168 )),
1169 })
1170 }
1171 LogicalPlan::TableScan(TableScan {
1172 table_name,
1173 source,
1174 filters,
1175 projection,
1176 ..
1177 }) => {
1178 let provider = source_as_provider(source)?;
1179 let schema = provider.schema();
1180
1181 let projection = match projection {
1182 None => None,
1183 Some(columns) => {
1184 let column_names = columns
1185 .iter()
1186 .map(|i| schema.field(*i).name().to_owned())
1187 .collect();
1188 Some(protobuf::ProjectionColumns {
1189 columns: column_names,
1190 })
1191 }
1192 };
1193
1194 let filters: Vec<protobuf::LogicalExprNode> =
1195 serialize_exprs(filters, extension_codec)?;
1196
1197 if let Some(listing_table) = provider.downcast_ref::<ListingTable>() {
1198 let format = listing_table.options().format.as_ref();
1199 let file_format_type = {
1200 let mut maybe_some_type = None;
1201
1202 #[cfg(feature = "parquet")]
1203 if let Some(parquet) = format.downcast_ref::<ParquetFormat>() {
1204 let options = parquet.options();
1205 maybe_some_type =
1206 Some(FileFormatType::Parquet(protobuf::ParquetFormat {
1207 options: Some(options.try_into()?),
1208 }));
1209 };
1210
1211 if let Some(csv) = format.downcast_ref::<CsvFormat>() {
1212 let options = csv.options();
1213 maybe_some_type =
1214 Some(FileFormatType::Csv(protobuf::CsvFormat {
1215 options: Some(options.try_into()?),
1216 }));
1217 }
1218
1219 if let Some(json) = format.downcast_ref::<OtherNdJsonFormat>() {
1220 let options = json.options();
1221 maybe_some_type =
1222 Some(FileFormatType::Json(protobuf::NdJsonFormat {
1223 options: Some(options.try_into()?),
1224 }))
1225 }
1226
1227 #[cfg(feature = "avro")]
1228 if format.is::<AvroFormat>() {
1229 maybe_some_type =
1230 Some(FileFormatType::Avro(protobuf::AvroFormat {}))
1231 }
1232
1233 if format.is::<ArrowFormat>() {
1234 maybe_some_type =
1235 Some(FileFormatType::Arrow(protobuf::ArrowFormat {}))
1236 }
1237
1238 if let Some(file_format_type) = maybe_some_type {
1239 file_format_type
1240 } else {
1241 return Err(proto_error(format!(
1242 "Error deserializing unknown file format: {:?}",
1243 listing_table.options().format
1244 )));
1245 }
1246 };
1247
1248 let options = listing_table.options();
1249
1250 let mut builder = SchemaBuilder::from(schema.as_ref());
1251 for (idx, field) in schema.fields().iter().enumerate().rev() {
1252 if options
1253 .table_partition_cols
1254 .iter()
1255 .any(|(name, _)| name == field.name())
1256 {
1257 builder.remove(idx);
1258 }
1259 }
1260
1261 let schema = builder.finish();
1262
1263 let schema: protobuf::Schema = (&schema).try_into()?;
1264
1265 let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
1266 for order in &options.file_sort_order {
1267 let expr_vec = SortExprNodeCollection {
1268 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1269 };
1270 exprs_vec.push(expr_vec);
1271 }
1272
1273 let partition_columns = options
1274 .table_partition_cols
1275 .iter()
1276 .map(|(name, arrow_type)| {
1277 let arrow_type = protobuf::ArrowType::try_from(arrow_type)
1278 .map_err(|e| {
1279 proto_error(format!(
1280 "Received an unknown ArrowType: {e}"
1281 ))
1282 })?;
1283 Ok(protobuf::PartitionColumn {
1284 name: name.clone(),
1285 arrow_type: Some(arrow_type),
1286 })
1287 })
1288 .collect::<Result<Vec<_>>>()?;
1289
1290 Ok(LogicalPlanNode {
1291 logical_plan_type: Some(LogicalPlanType::ListingScan(
1292 protobuf::ListingTableScanNode {
1293 file_format_type: Some(file_format_type),
1294 table_name: Some(table_name.clone().into()),
1295 collect_stat: options.collect_stat,
1296 file_extension: options.file_extension.clone(),
1297 table_partition_cols: partition_columns,
1298 paths: listing_table
1299 .table_paths()
1300 .iter()
1301 .map(|x| x.to_string())
1302 .collect(),
1303 schema: Some(schema),
1304 projection,
1305 filters,
1306 target_partitions: options.target_partitions as u32,
1307 file_sort_order: exprs_vec,
1308 },
1309 )),
1310 })
1311 } else if let Some(view_table) = provider.downcast_ref::<ViewTable>() {
1312 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1313 Ok(LogicalPlanNode {
1314 logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
1315 protobuf::ViewTableScanNode {
1316 table_name: Some(table_name.clone().into()),
1317 input: Some(Box::new(
1318 LogicalPlanNode::try_from_logical_plan(
1319 view_table.logical_plan(),
1320 extension_codec,
1321 )?,
1322 )),
1323 schema: Some(schema),
1324 projection,
1325 definition: view_table
1326 .definition()
1327 .map(|s| s.to_string())
1328 .unwrap_or_default(),
1329 },
1330 ))),
1331 })
1332 } else if let Some(cte_work_table) =
1333 provider.downcast_ref::<CteWorkTable>()
1334 {
1335 let name = cte_work_table.name().to_string();
1336 let schema = cte_work_table.schema();
1337 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1338
1339 Ok(LogicalPlanNode {
1340 logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
1341 protobuf::CteWorkTableScanNode {
1342 name,
1343 schema: Some(schema),
1344 },
1345 )),
1346 })
1347 } else if provider.downcast_ref::<EmptyTable>().is_some() {
1348 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1349
1350 Ok(LogicalPlanNode {
1351 logical_plan_type: Some(LogicalPlanType::EmptyTableScan(
1352 protobuf::EmptyTableScanNode {
1353 table_name: Some(table_name.clone().into()),
1354 schema: Some(schema),
1355 projection,
1356 filters,
1357 },
1358 )),
1359 })
1360 } else {
1361 let schema: protobuf::Schema = schema.as_ref().try_into()?;
1362 let mut bytes = vec![];
1363 extension_codec
1364 .try_encode_table_provider(table_name, provider, &mut bytes)
1365 .map_err(|e| context!("Error serializing custom table", e))?;
1366 let scan = CustomScan(CustomTableScanNode {
1367 table_name: Some(table_name.clone().into()),
1368 projection,
1369 schema: Some(schema),
1370 filters,
1371 custom_table_data: bytes,
1372 });
1373 let node = LogicalPlanNode {
1374 logical_plan_type: Some(scan),
1375 };
1376 Ok(node)
1377 }
1378 }
1379 LogicalPlan::Projection(Projection { expr, input, .. }) => {
1380 Ok(LogicalPlanNode {
1381 logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
1382 protobuf::ProjectionNode {
1383 input: Some(Box::new(
1384 LogicalPlanNode::try_from_logical_plan(
1385 input.as_ref(),
1386 extension_codec,
1387 )?,
1388 )),
1389 expr: serialize_exprs(expr, extension_codec)?,
1390 optional_alias: None,
1391 },
1392 ))),
1393 })
1394 }
1395 LogicalPlan::Filter(filter) => {
1396 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1397 filter.input.as_ref(),
1398 extension_codec,
1399 )?;
1400 Ok(LogicalPlanNode {
1401 logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
1402 protobuf::SelectionNode {
1403 input: Some(Box::new(input)),
1404 expr: Some(Box::new(serialize_expr(
1405 &filter.predicate,
1406 extension_codec,
1407 )?)),
1408 },
1409 ))),
1410 })
1411 }
1412 LogicalPlan::Distinct(Distinct::All(input)) => {
1413 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1414 input.as_ref(),
1415 extension_codec,
1416 )?;
1417 Ok(LogicalPlanNode {
1418 logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
1419 protobuf::DistinctNode {
1420 input: Some(Box::new(input)),
1421 },
1422 ))),
1423 })
1424 }
1425 LogicalPlan::Distinct(Distinct::On(DistinctOn {
1426 on_expr,
1427 select_expr,
1428 sort_expr,
1429 input,
1430 ..
1431 })) => {
1432 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1433 input.as_ref(),
1434 extension_codec,
1435 )?;
1436 let sort_expr = match sort_expr {
1437 None => vec![],
1438 Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
1439 };
1440 Ok(LogicalPlanNode {
1441 logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
1442 protobuf::DistinctOnNode {
1443 on_expr: serialize_exprs(on_expr, extension_codec)?,
1444 select_expr: serialize_exprs(select_expr, extension_codec)?,
1445 sort_expr,
1446 input: Some(Box::new(input)),
1447 },
1448 ))),
1449 })
1450 }
1451 LogicalPlan::Window(Window {
1452 input, window_expr, ..
1453 }) => {
1454 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1455 input.as_ref(),
1456 extension_codec,
1457 )?;
1458 Ok(LogicalPlanNode {
1459 logical_plan_type: Some(LogicalPlanType::Window(Box::new(
1460 protobuf::WindowNode {
1461 input: Some(Box::new(input)),
1462 window_expr: serialize_exprs(window_expr, extension_codec)?,
1463 },
1464 ))),
1465 })
1466 }
1467 LogicalPlan::Aggregate(Aggregate {
1468 group_expr,
1469 aggr_expr,
1470 input,
1471 ..
1472 }) => {
1473 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1474 input.as_ref(),
1475 extension_codec,
1476 )?;
1477 Ok(LogicalPlanNode {
1478 logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
1479 protobuf::AggregateNode {
1480 input: Some(Box::new(input)),
1481 group_expr: serialize_exprs(group_expr, extension_codec)?,
1482 aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
1483 },
1484 ))),
1485 })
1486 }
1487 LogicalPlan::Join(Join {
1488 left,
1489 right,
1490 on,
1491 filter,
1492 join_type,
1493 join_constraint,
1494 null_equality,
1495 ..
1496 }) => {
1497 let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1498 left.as_ref(),
1499 extension_codec,
1500 )?;
1501 let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1502 right.as_ref(),
1503 extension_codec,
1504 )?;
1505 let (left_join_key, right_join_key) = on
1506 .iter()
1507 .map(|(l, r)| {
1508 Ok((
1509 serialize_expr(l, extension_codec)?,
1510 serialize_expr(r, extension_codec)?,
1511 ))
1512 })
1513 .collect::<Result<Vec<_>, ToProtoError>>()?
1514 .into_iter()
1515 .unzip();
1516 let join_type: protobuf::JoinType = join_type.to_owned().into();
1517 let join_constraint: protobuf::JoinConstraint =
1518 join_constraint.to_owned().into();
1519 let null_equality: protobuf::NullEquality =
1520 null_equality.to_owned().into();
1521 let filter = filter
1522 .as_ref()
1523 .map(|e| serialize_expr(e, extension_codec).map(Box::new))
1524 .map_or(Ok(None), |v| v.map(Some))?;
1525 Ok(LogicalPlanNode {
1526 logical_plan_type: Some(LogicalPlanType::Join(Box::new(
1527 protobuf::JoinNode {
1528 left: Some(Box::new(left)),
1529 right: Some(Box::new(right)),
1530 join_type: join_type.into(),
1531 join_constraint: join_constraint.into(),
1532 left_join_key,
1533 right_join_key,
1534 null_equality: null_equality.into(),
1535 filter,
1536 },
1537 ))),
1538 })
1539 }
1540 LogicalPlan::Subquery(subquery) => {
1541 LogicalPlanNode::try_from_logical_plan(
1545 &subquery.subquery,
1546 extension_codec,
1547 )
1548 }
1549 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
1550 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1551 input.as_ref(),
1552 extension_codec,
1553 )?;
1554 Ok(LogicalPlanNode {
1555 logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
1556 protobuf::SubqueryAliasNode {
1557 input: Some(Box::new(input)),
1558 alias: Some((*alias).clone().into()),
1559 },
1560 ))),
1561 })
1562 }
1563 LogicalPlan::Limit(limit) => {
1564 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1565 limit.input.as_ref(),
1566 extension_codec,
1567 )?;
1568 let SkipType::Literal(skip) = limit.get_skip_type()? else {
1569 return Err(proto_error(
1570 "LogicalPlan::Limit only supports literal skip values",
1571 ));
1572 };
1573 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
1574 return Err(proto_error(
1575 "LogicalPlan::Limit only supports literal fetch values",
1576 ));
1577 };
1578
1579 Ok(LogicalPlanNode {
1580 logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
1581 protobuf::LimitNode {
1582 input: Some(Box::new(input)),
1583 skip: skip as i64,
1584 fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
1585 },
1586 ))),
1587 })
1588 }
1589 LogicalPlan::Sort(Sort { input, expr, fetch }) => {
1590 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1591 input.as_ref(),
1592 extension_codec,
1593 )?;
1594 let sort_expr: Vec<protobuf::SortExprNode> =
1595 serialize_sorts(expr, extension_codec)?;
1596 Ok(LogicalPlanNode {
1597 logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
1598 protobuf::SortNode {
1599 input: Some(Box::new(input)),
1600 expr: sort_expr,
1601 fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
1602 },
1603 ))),
1604 })
1605 }
1606 LogicalPlan::Repartition(Repartition {
1607 input,
1608 partitioning_scheme,
1609 }) => {
1610 use datafusion_expr::Partitioning;
1611 let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
1612 input.as_ref(),
1613 extension_codec,
1614 )?;
1615
1616 use protobuf::repartition_node::PartitionMethod;
1619
1620 let pb_partition_method = match partitioning_scheme {
1621 Partitioning::Hash(exprs, partition_count) => {
1622 PartitionMethod::Hash(protobuf::HashRepartition {
1623 hash_expr: serialize_exprs(exprs, extension_codec)?,
1624 partition_count: *partition_count as u64,
1625 })
1626 }
1627 Partitioning::RoundRobinBatch(partition_count) => {
1628 PartitionMethod::RoundRobin(*partition_count as u64)
1629 }
1630 Partitioning::DistributeBy(_) => {
1631 return not_impl_err!("DistributeBy");
1632 }
1633 };
1634
1635 Ok(LogicalPlanNode {
1636 logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
1637 protobuf::RepartitionNode {
1638 input: Some(Box::new(input)),
1639 partition_method: Some(pb_partition_method),
1640 },
1641 ))),
1642 })
1643 }
1644 LogicalPlan::EmptyRelation(EmptyRelation {
1645 produce_one_row, ..
1646 }) => Ok(LogicalPlanNode {
1647 logical_plan_type: Some(LogicalPlanType::EmptyRelation(
1648 protobuf::EmptyRelationNode {
1649 produce_one_row: *produce_one_row,
1650 },
1651 )),
1652 }),
1653 LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
1654 CreateExternalTable {
1655 name,
1656 location,
1657 file_type,
1658 schema: df_schema,
1659 table_partition_cols,
1660 if_not_exists,
1661 or_replace,
1662 definition,
1663 order_exprs,
1664 unbounded,
1665 options,
1666 constraints,
1667 column_defaults,
1668 temporary,
1669 },
1670 )) => {
1671 let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
1672 for order in order_exprs {
1673 let temp = SortExprNodeCollection {
1674 sort_expr_nodes: serialize_sorts(order, extension_codec)?,
1675 };
1676 converted_order_exprs.push(temp);
1677 }
1678
1679 let mut converted_column_defaults =
1680 HashMap::with_capacity(column_defaults.len());
1681 for (col_name, expr) in column_defaults {
1682 converted_column_defaults
1683 .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
1684 }
1685
1686 Ok(LogicalPlanNode {
1687 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
1688 protobuf::CreateExternalTableNode {
1689 name: Some(name.clone().into()),
1690 location: location.clone(),
1691 file_type: file_type.clone(),
1692 schema: Some(df_schema.try_into()?),
1693 table_partition_cols: table_partition_cols.clone(),
1694 if_not_exists: *if_not_exists,
1695 or_replace: *or_replace,
1696 temporary: *temporary,
1697 order_exprs: converted_order_exprs,
1698 definition: definition.clone().unwrap_or_default(),
1699 unbounded: *unbounded,
1700 options: options.clone(),
1701 constraints: Some(constraints.clone().into()),
1702 column_defaults: converted_column_defaults,
1703 },
1704 )),
1705 })
1706 }
1707 LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
1708 name,
1709 input,
1710 or_replace,
1711 definition,
1712 temporary,
1713 })) => Ok(LogicalPlanNode {
1714 logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
1715 protobuf::CreateViewNode {
1716 name: Some(name.clone().into()),
1717 input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
1718 input,
1719 extension_codec,
1720 )?)),
1721 or_replace: *or_replace,
1722 temporary: *temporary,
1723 definition: definition.clone().unwrap_or_default(),
1724 },
1725 ))),
1726 }),
1727 LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
1728 CreateCatalogSchema {
1729 schema_name,
1730 if_not_exists,
1731 schema: df_schema,
1732 },
1733 )) => Ok(LogicalPlanNode {
1734 logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
1735 protobuf::CreateCatalogSchemaNode {
1736 schema_name: schema_name.clone(),
1737 if_not_exists: *if_not_exists,
1738 schema: Some(df_schema.try_into()?),
1739 },
1740 )),
1741 }),
1742 LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
1743 catalog_name,
1744 if_not_exists,
1745 schema: df_schema,
1746 })) => Ok(LogicalPlanNode {
1747 logical_plan_type: Some(LogicalPlanType::CreateCatalog(
1748 protobuf::CreateCatalogNode {
1749 catalog_name: catalog_name.clone(),
1750 if_not_exists: *if_not_exists,
1751 schema: Some(df_schema.try_into()?),
1752 },
1753 )),
1754 }),
1755 LogicalPlan::Analyze(a) => {
1756 let input = LogicalPlanNode::try_from_logical_plan(
1757 a.input.as_ref(),
1758 extension_codec,
1759 )?;
1760 Ok(LogicalPlanNode {
1761 logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
1762 protobuf::AnalyzeNode {
1763 input: Some(Box::new(input)),
1764 verbose: a.verbose,
1765 },
1766 ))),
1767 })
1768 }
1769 LogicalPlan::Explain(a) => {
1770 let input = LogicalPlanNode::try_from_logical_plan(
1771 a.plan.as_ref(),
1772 extension_codec,
1773 )?;
1774 Ok(LogicalPlanNode {
1775 logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
1776 protobuf::ExplainNode {
1777 input: Some(Box::new(input)),
1778 verbose: a.verbose,
1779 format: match &a.explain_format {
1780 ExplainFormat::Indent => protobuf::ExplainFormat::Indent,
1781 ExplainFormat::Tree => protobuf::ExplainFormat::Tree,
1782 ExplainFormat::PostgresJSON => {
1783 protobuf::ExplainFormat::Pgjson
1784 }
1785 ExplainFormat::Graphviz => {
1786 protobuf::ExplainFormat::Graphviz
1787 }
1788 }
1789 .into(),
1790 },
1791 ))),
1792 })
1793 }
1794 LogicalPlan::Union(union) => {
1795 let inputs: Vec<LogicalPlanNode> = union
1796 .inputs
1797 .iter()
1798 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1799 .collect::<Result<_>>()?;
1800 Ok(LogicalPlanNode {
1801 logical_plan_type: Some(LogicalPlanType::Union(
1802 protobuf::UnionNode { inputs },
1803 )),
1804 })
1805 }
1806 LogicalPlan::Extension(extension) => {
1807 let mut buf: Vec<u8> = vec![];
1808 extension_codec.try_encode(extension, &mut buf)?;
1809
1810 let inputs: Vec<LogicalPlanNode> = extension
1811 .node
1812 .inputs()
1813 .iter()
1814 .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
1815 .collect::<Result<_>>()?;
1816
1817 Ok(LogicalPlanNode {
1818 logical_plan_type: Some(LogicalPlanType::Extension(
1819 LogicalExtensionNode { node: buf, inputs },
1820 )),
1821 })
1822 }
1823 LogicalPlan::Statement(Statement::Prepare(Prepare {
1824 name,
1825 fields,
1826 input,
1827 })) => {
1828 let input =
1829 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1830 Ok(LogicalPlanNode {
1831 logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
1832 protobuf::PrepareNode {
1833 name: name.clone(),
1834 input: Some(Box::new(input)),
1835 data_types: fields
1837 .iter()
1838 .map(|f| f.data_type().try_into())
1839 .collect::<Result<Vec<_>, _>>()?,
1840 fields: fields
1842 .iter()
1843 .map(|f| f.as_ref().try_into())
1844 .collect::<Result<Vec<_>, _>>()?,
1845 },
1846 ))),
1847 })
1848 }
1849 LogicalPlan::Unnest(Unnest {
1850 input,
1851 exec_columns,
1852 list_type_columns,
1853 struct_type_columns,
1854 dependency_indices,
1855 schema,
1856 options,
1857 }) => {
1858 let input =
1859 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1860 let proto_unnest_list_items = list_type_columns
1861 .iter()
1862 .map(|(index, ul)| ColumnUnnestListItem {
1863 input_index: *index as _,
1864 recursion: Some(ColumnUnnestListRecursion {
1865 output_column: Some(ul.output_column.to_owned().into()),
1866 depth: ul.depth as _,
1867 }),
1868 })
1869 .collect();
1870 Ok(LogicalPlanNode {
1871 logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
1872 protobuf::UnnestNode {
1873 input: Some(Box::new(input)),
1874 exec_columns: exec_columns
1875 .iter()
1876 .map(|col| col.into())
1877 .collect(),
1878 list_type_columns: proto_unnest_list_items,
1879 struct_type_columns: struct_type_columns
1880 .iter()
1881 .map(|c| *c as u64)
1882 .collect(),
1883 dependency_indices: dependency_indices
1884 .iter()
1885 .map(|c| *c as u64)
1886 .collect(),
1887 schema: Some(schema.try_into()?),
1888 options: Some(options.into()),
1889 },
1890 ))),
1891 })
1892 }
1893 LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
1894 "LogicalPlan serde is not yet implemented for CreateMemoryTable",
1895 )),
1896 LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
1897 "LogicalPlan serde is not yet implemented for CreateIndex",
1898 )),
1899 LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
1900 "LogicalPlan serde is not yet implemented for DropTable",
1901 )),
1902 LogicalPlan::Ddl(DdlStatement::DropView(DropView {
1903 name,
1904 if_exists,
1905 schema,
1906 })) => Ok(LogicalPlanNode {
1907 logical_plan_type: Some(LogicalPlanType::DropView(
1908 protobuf::DropViewNode {
1909 name: Some(name.clone().into()),
1910 if_exists: *if_exists,
1911 schema: Some(schema.try_into()?),
1912 },
1913 )),
1914 }),
1915 LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
1916 "LogicalPlan serde is not yet implemented for DropCatalogSchema",
1917 )),
1918 LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
1919 "LogicalPlan serde is not yet implemented for CreateFunction",
1920 )),
1921 LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
1922 "LogicalPlan serde is not yet implemented for DropFunction",
1923 )),
1924 LogicalPlan::Statement(_) => Err(proto_error(
1925 "LogicalPlan serde is not yet implemented for Statement",
1926 )),
1927 LogicalPlan::Dml(DmlStatement {
1928 table_name,
1929 target,
1930 op,
1931 input,
1932 ..
1933 }) => {
1934 let input =
1935 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1936 let dml_type: dml_node::Type = op.into();
1937 Ok(LogicalPlanNode {
1938 logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
1939 input: Some(Box::new(input)),
1940 target: Some(Box::new(from_table_source(
1941 table_name.clone(),
1942 Arc::clone(target),
1943 extension_codec,
1944 )?)),
1945 table_name: Some(table_name.clone().into()),
1946 dml_type: dml_type.into(),
1947 }))),
1948 })
1949 }
1950 LogicalPlan::Copy(dml::CopyTo {
1951 input,
1952 output_url,
1953 file_type,
1954 partition_by,
1955 ..
1956 }) => {
1957 let input =
1958 LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
1959 let mut buf = Vec::new();
1960 extension_codec
1961 .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
1962
1963 Ok(LogicalPlanNode {
1964 logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
1965 protobuf::CopyToNode {
1966 input: Some(Box::new(input)),
1967 output_url: output_url.to_string(),
1968 file_type: buf,
1969 partition_by: partition_by.clone(),
1970 },
1971 ))),
1972 })
1973 }
1974 LogicalPlan::DescribeTable(_) => Err(proto_error(
1975 "LogicalPlan serde is not yet implemented for DescribeTable",
1976 )),
1977 LogicalPlan::RecursiveQuery(recursive) => {
1978 let static_term = LogicalPlanNode::try_from_logical_plan(
1979 recursive.static_term.as_ref(),
1980 extension_codec,
1981 )?;
1982 let recursive_term = LogicalPlanNode::try_from_logical_plan(
1983 recursive.recursive_term.as_ref(),
1984 extension_codec,
1985 )?;
1986
1987 Ok(LogicalPlanNode {
1988 logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
1989 protobuf::RecursiveQueryNode {
1990 name: recursive.name.clone(),
1991 static_term: Some(Box::new(static_term)),
1992 recursive_term: Some(Box::new(recursive_term)),
1993 is_distinct: recursive.is_distinct,
1994 },
1995 ))),
1996 })
1997 }
1998 }
1999 }
2000}