1use std::any::Any;
19use std::cell::RefCell;
20use std::collections::HashMap;
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use arrow::compute::SortOptions;
25use arrow::datatypes::{IntervalMonthDayNanoType, Schema, SchemaRef};
26use datafusion_catalog::memory::MemorySourceConfig;
27use datafusion_common::config::CsvOptions;
28use datafusion_common::{
29 DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
30};
31#[cfg(feature = "parquet")]
32use datafusion_datasource::file::FileSource;
33use datafusion_datasource::file_compression_type::FileCompressionType;
34use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
35use datafusion_datasource::sink::DataSinkExec;
36use datafusion_datasource::source::{DataSource, DataSourceExec};
37use datafusion_datasource_arrow::source::ArrowSource;
38#[cfg(feature = "avro")]
39use datafusion_datasource_avro::source::AvroSource;
40use datafusion_datasource_csv::file_format::CsvSink;
41use datafusion_datasource_csv::source::CsvSource;
42use datafusion_datasource_json::file_format::JsonSink;
43use datafusion_datasource_json::source::JsonSource;
44#[cfg(feature = "parquet")]
45use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
46#[cfg(feature = "parquet")]
47use datafusion_datasource_parquet::file_format::ParquetSink;
48#[cfg(feature = "parquet")]
49use datafusion_datasource_parquet::source::ParquetSource;
50#[cfg(feature = "parquet")]
51use datafusion_execution::object_store::ObjectStoreUrl;
52use datafusion_execution::{FunctionRegistry, TaskContext};
53use datafusion_expr::execution_props::{ScalarSubqueryResults, SubqueryIndex};
54use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
55use datafusion_functions_table::generate_series::{
56 Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue,
57};
58use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
59use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr;
60use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
61use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
62use datafusion_physical_plan::aggregates::{
63 AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy,
64};
65use datafusion_physical_plan::analyze::AnalyzeExec;
66use datafusion_physical_plan::async_func::AsyncFuncExec;
67use datafusion_physical_plan::buffer::BufferExec;
68#[expect(deprecated)]
69use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
70use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
71use datafusion_physical_plan::coop::CooperativeExec;
72use datafusion_physical_plan::empty::EmptyExec;
73use datafusion_physical_plan::explain::ExplainExec;
74use datafusion_physical_plan::expressions::PhysicalSortExpr;
75use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
76use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
77use datafusion_physical_plan::joins::{
78 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
79 StreamJoinPartitionMode, SymmetricHashJoinExec,
80};
81use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
82use datafusion_physical_plan::memory::LazyMemoryExec;
83use datafusion_physical_plan::metrics::{MetricCategory, MetricType};
84use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
85use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
86use datafusion_physical_plan::repartition::RepartitionExec;
87use datafusion_physical_plan::scalar_subquery::{ScalarSubqueryExec, ScalarSubqueryLink};
88use datafusion_physical_plan::sorts::sort::SortExec;
89use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
90use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
91use datafusion_physical_plan::unnest::{ListUnnest, UnnestExec};
92use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
93use datafusion_physical_plan::{ExecutionPlan, InputOrderMode, PhysicalExpr, WindowExpr};
94use prost::Message;
95use prost::bytes::BufMut;
96
97use self::from_proto::parse_protobuf_partitioning;
98use self::to_proto::serialize_partitioning;
99use crate::common::{byte_to_string, str_to_byte};
100use crate::physical_plan::from_proto::{
101 parse_physical_expr_with_converter, parse_physical_sort_expr,
102 parse_physical_sort_exprs, parse_physical_window_expr,
103 parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto,
104};
105use crate::physical_plan::to_proto::{
106 serialize_file_scan_config, serialize_maybe_filter, serialize_physical_aggr_expr,
107 serialize_physical_expr_with_converter, serialize_physical_sort_exprs,
108 serialize_physical_window_expr, serialize_record_batches,
109};
110use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
111use crate::protobuf::physical_expr_node::ExprType;
112use crate::protobuf::physical_plan_node::PhysicalPlanType;
113use crate::protobuf::{
114 self, ListUnnest as ProtoListUnnest, SortExprNode, SortMergeJoinExecNode,
115 proto_error, window_agg_exec_node,
116};
117use crate::{convert_required, into_required};
118
119pub mod from_proto;
120pub mod to_proto;
121
122const HUMAN_DISPLAY_ALIAS_PREFIX: &str = "\u{1f}datafusion_human_display_alias_v1:";
123
124fn encode_human_display_alias(human_display: &str, alias: &str) -> String {
125 format!(
126 "{HUMAN_DISPLAY_ALIAS_PREFIX}{}:{alias}{human_display}",
127 alias.len()
128 )
129}
130
131fn split_human_display_alias<'a>(
132 human_display: &'a str,
133 name: &'a str,
134) -> (&'a str, Option<&'a str>) {
135 if let Some(encoded) = human_display.strip_prefix(HUMAN_DISPLAY_ALIAS_PREFIX)
136 && let Some((alias_len, encoded)) = encoded.split_once(':')
137 && let Ok(alias_len) = alias_len.parse::<usize>()
138 && let Some(alias) = encoded.get(..alias_len)
139 && let Some(human_display) = encoded.get(alias_len..)
140 && alias == name
141 && !human_display.is_empty()
142 {
143 return (human_display, Some(alias));
144 }
145
146 (human_display, None)
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn split_human_display_alias_ignores_mismatched_alias() {
155 let encoded = encode_human_display_alias("sum(value)", "revenue");
156
157 assert_eq!(
158 split_human_display_alias(&encoded, "other"),
159 (encoded.as_str(), None)
160 );
161 }
162
163 #[test]
164 fn split_human_display_alias_keeps_malformed_prefix_literal() {
165 let display = format!("{HUMAN_DISPLAY_ALIAS_PREFIX}not-an-encoding");
166
167 assert_eq!(
168 split_human_display_alias(&display, "agg"),
169 (display.as_str(), None)
170 );
171 }
172}
173
174#[derive(Clone)]
180pub struct PhysicalPlanDecodeContext<'a> {
181 task_ctx: &'a TaskContext,
182 codec: &'a dyn PhysicalExtensionCodec,
183 scalar_subquery_results: Option<ScalarSubqueryResults>,
184}
185
186impl<'a> PhysicalPlanDecodeContext<'a> {
187 pub fn new(task_ctx: &'a TaskContext, codec: &'a dyn PhysicalExtensionCodec) -> Self {
189 Self {
190 task_ctx,
191 codec,
192 scalar_subquery_results: None,
193 }
194 }
195
196 pub fn task_ctx(&self) -> &'a TaskContext {
198 self.task_ctx
199 }
200
201 pub fn codec(&self) -> &'a dyn PhysicalExtensionCodec {
203 self.codec
204 }
205
206 pub fn scalar_subquery_results(&self) -> Option<&ScalarSubqueryResults> {
209 self.scalar_subquery_results.as_ref()
210 }
211
212 pub fn with_scalar_subquery_results(
215 &self,
216 scalar_subquery_results: ScalarSubqueryResults,
217 ) -> Self {
218 Self {
219 task_ctx: self.task_ctx,
220 codec: self.codec,
221 scalar_subquery_results: Some(scalar_subquery_results),
222 }
223 }
224}
225
226impl AsExecutionPlan for protobuf::PhysicalPlanNode {
227 fn try_decode(buf: &[u8]) -> Result<Self>
228 where
229 Self: Sized,
230 {
231 protobuf::PhysicalPlanNode::decode(buf).map_err(|e| {
232 internal_datafusion_err!("failed to decode physical plan: {e:?}")
233 })
234 }
235
236 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
237 where
238 B: BufMut,
239 Self: Sized,
240 {
241 self.encode(buf).map_err(|e| {
242 internal_datafusion_err!("failed to encode physical plan: {e:?}")
243 })
244 }
245
246 fn try_into_physical_plan(
247 &self,
248 ctx: &TaskContext,
249 codec: &dyn PhysicalExtensionCodec,
250 ) -> Result<Arc<dyn ExecutionPlan>> {
251 self.try_into_physical_plan_with_converter(
252 ctx,
253 codec,
254 &DefaultPhysicalProtoConverter {},
255 )
256 }
257
258 fn try_from_physical_plan(
259 plan: Arc<dyn ExecutionPlan>,
260 codec: &dyn PhysicalExtensionCodec,
261 ) -> Result<Self>
262 where
263 Self: Sized,
264 {
265 Self::try_from_physical_plan_with_converter(
266 plan,
267 codec,
268 &DefaultPhysicalProtoConverter {},
269 )
270 }
271}
272
273impl protobuf::PhysicalPlanNode {
274 pub fn try_into_physical_plan_with_converter(
275 &self,
276 ctx: &TaskContext,
277 codec: &dyn PhysicalExtensionCodec,
278 proto_converter: &dyn PhysicalProtoConverterExtension,
279 ) -> Result<Arc<dyn ExecutionPlan>> {
280 let decode_ctx = PhysicalPlanDecodeContext::new(ctx, codec);
281 self.try_into_physical_plan_with_context(&decode_ctx, proto_converter)
282 }
283
284 pub(crate) fn try_into_physical_plan_with_context(
285 &self,
286 ctx: &PhysicalPlanDecodeContext<'_>,
287 proto_converter: &dyn PhysicalProtoConverterExtension,
288 ) -> Result<Arc<dyn ExecutionPlan>> {
289 let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
290 proto_error(format!(
291 "physical_plan::from_proto() Unsupported physical plan '{self:?}'"
292 ))
293 })?;
294 match plan {
295 PhysicalPlanType::Explain(explain) => {
296 self.try_into_explain_physical_plan(explain, ctx, proto_converter)
297 }
298 PhysicalPlanType::Projection(projection) => {
299 self.try_into_projection_physical_plan(projection, ctx, proto_converter)
300 }
301 PhysicalPlanType::Filter(filter) => {
302 self.try_into_filter_physical_plan(filter, ctx, proto_converter)
303 }
304 PhysicalPlanType::CsvScan(scan) => {
305 self.try_into_csv_scan_physical_plan(scan, ctx, proto_converter)
306 }
307 PhysicalPlanType::JsonScan(scan) => {
308 self.try_into_json_scan_physical_plan(scan, ctx, proto_converter)
309 }
310 PhysicalPlanType::ParquetScan(scan) => {
311 self.try_into_parquet_scan_physical_plan(scan, ctx, proto_converter)
312 }
313 PhysicalPlanType::AvroScan(scan) => {
314 self.try_into_avro_scan_physical_plan(scan, ctx, proto_converter)
315 }
316 PhysicalPlanType::MemoryScan(scan) => {
317 self.try_into_memory_scan_physical_plan(scan, ctx, proto_converter)
318 }
319 PhysicalPlanType::ArrowScan(scan) => {
320 self.try_into_arrow_scan_physical_plan(scan, ctx, proto_converter)
321 }
322 PhysicalPlanType::CoalesceBatches(coalesce_batches) => self
323 .try_into_coalesce_batches_physical_plan(
324 coalesce_batches,
325 ctx,
326 proto_converter,
327 ),
328 PhysicalPlanType::Merge(merge) => {
329 self.try_into_merge_physical_plan(merge, ctx, proto_converter)
330 }
331 PhysicalPlanType::Repartition(repart) => {
332 self.try_into_repartition_physical_plan(repart, ctx, proto_converter)
333 }
334 PhysicalPlanType::GlobalLimit(limit) => {
335 self.try_into_global_limit_physical_plan(limit, ctx, proto_converter)
336 }
337 PhysicalPlanType::LocalLimit(limit) => {
338 self.try_into_local_limit_physical_plan(limit, ctx, proto_converter)
339 }
340 PhysicalPlanType::Window(window_agg) => {
341 self.try_into_window_physical_plan(window_agg, ctx, proto_converter)
342 }
343 PhysicalPlanType::Aggregate(hash_agg) => {
344 self.try_into_aggregate_physical_plan(hash_agg, ctx, proto_converter)
345 }
346 PhysicalPlanType::HashJoin(hashjoin) => {
347 self.try_into_hash_join_physical_plan(hashjoin, ctx, proto_converter)
348 }
349 PhysicalPlanType::SymmetricHashJoin(sym_join) => self
350 .try_into_symmetric_hash_join_physical_plan(
351 sym_join,
352 ctx,
353 proto_converter,
354 ),
355 PhysicalPlanType::Union(union) => {
356 self.try_into_union_physical_plan(union, ctx, proto_converter)
357 }
358 PhysicalPlanType::Interleave(interleave) => {
359 self.try_into_interleave_physical_plan(interleave, ctx, proto_converter)
360 }
361 PhysicalPlanType::CrossJoin(crossjoin) => {
362 self.try_into_cross_join_physical_plan(crossjoin, ctx, proto_converter)
363 }
364 PhysicalPlanType::Empty(empty) => {
365 self.try_into_empty_physical_plan(empty, ctx, proto_converter)
366 }
367 PhysicalPlanType::PlaceholderRow(placeholder) => {
368 self.try_into_placeholder_row_physical_plan(placeholder, ctx)
369 }
370 PhysicalPlanType::Sort(sort) => {
371 self.try_into_sort_physical_plan(sort, ctx, proto_converter)
372 }
373 PhysicalPlanType::SortPreservingMerge(sort) => self
374 .try_into_sort_preserving_merge_physical_plan(sort, ctx, proto_converter),
375 PhysicalPlanType::Extension(extension) => {
376 self.try_into_extension_physical_plan(extension, ctx, proto_converter)
377 }
378 PhysicalPlanType::NestedLoopJoin(join) => {
379 self.try_into_nested_loop_join_physical_plan(join, ctx, proto_converter)
380 }
381 PhysicalPlanType::Analyze(analyze) => {
382 self.try_into_analyze_physical_plan(analyze, ctx, proto_converter)
383 }
384 PhysicalPlanType::JsonSink(sink) => {
385 self.try_into_json_sink_physical_plan(sink, ctx, proto_converter)
386 }
387 PhysicalPlanType::CsvSink(sink) => {
388 self.try_into_csv_sink_physical_plan(sink, ctx, proto_converter)
389 }
390 #[cfg_attr(not(feature = "parquet"), allow(unused_variables))]
391 PhysicalPlanType::ParquetSink(sink) => {
392 self.try_into_parquet_sink_physical_plan(sink, ctx, proto_converter)
393 }
394 PhysicalPlanType::Unnest(unnest) => {
395 self.try_into_unnest_physical_plan(unnest, ctx, proto_converter)
396 }
397 PhysicalPlanType::Cooperative(cooperative) => {
398 self.try_into_cooperative_physical_plan(cooperative, ctx, proto_converter)
399 }
400 PhysicalPlanType::GenerateSeries(generate_series) => {
401 self.try_into_generate_series_physical_plan(generate_series)
402 }
403 PhysicalPlanType::SortMergeJoin(sort_join) => {
404 self.try_into_sort_join(sort_join, ctx, proto_converter)
405 }
406 PhysicalPlanType::AsyncFunc(async_func) => {
407 self.try_into_async_func_physical_plan(async_func, ctx, proto_converter)
408 }
409 PhysicalPlanType::Buffer(buffer) => {
410 self.try_into_buffer_physical_plan(buffer, ctx, proto_converter)
411 }
412 PhysicalPlanType::ScalarSubquery(sq) => {
413 self.try_into_scalar_subquery_physical_plan(sq, ctx, proto_converter)
414 }
415 }
416 }
417
418 pub fn try_from_physical_plan_with_converter(
419 plan: Arc<dyn ExecutionPlan>,
420 codec: &dyn PhysicalExtensionCodec,
421 proto_converter: &dyn PhysicalProtoConverterExtension,
422 ) -> Result<Self>
423 where
424 Self: Sized,
425 {
426 let plan_clone = Arc::clone(&plan);
427 let plan = plan.as_ref() as &dyn Any;
428
429 if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
430 return protobuf::PhysicalPlanNode::try_from_explain_exec(exec, codec);
431 }
432
433 if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
434 return protobuf::PhysicalPlanNode::try_from_projection_exec(
435 exec,
436 codec,
437 proto_converter,
438 );
439 }
440
441 if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
442 return protobuf::PhysicalPlanNode::try_from_analyze_exec(
443 exec,
444 codec,
445 proto_converter,
446 );
447 }
448
449 if let Some(exec) = plan.downcast_ref::<FilterExec>() {
450 return protobuf::PhysicalPlanNode::try_from_filter_exec(
451 exec,
452 codec,
453 proto_converter,
454 );
455 }
456
457 if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
458 return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
459 limit,
460 codec,
461 proto_converter,
462 );
463 }
464
465 if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
466 return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
467 limit,
468 codec,
469 proto_converter,
470 );
471 }
472
473 if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
474 return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
475 exec,
476 codec,
477 proto_converter,
478 );
479 }
480
481 if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
482 return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
483 exec,
484 codec,
485 proto_converter,
486 );
487 }
488
489 if let Some(exec) = plan.downcast_ref::<SortMergeJoinExec>() {
490 return protobuf::PhysicalPlanNode::try_from_sort_merge_join_exec(
491 exec,
492 codec,
493 proto_converter,
494 );
495 }
496
497 if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
498 return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
499 exec,
500 codec,
501 proto_converter,
502 );
503 }
504
505 if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
506 return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
507 exec,
508 codec,
509 proto_converter,
510 );
511 }
512
513 if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
514 return protobuf::PhysicalPlanNode::try_from_empty_exec(empty, codec);
515 }
516
517 if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
518 return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
519 empty, codec,
520 );
521 }
522
523 #[expect(deprecated)]
524 if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
525 return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
526 coalesce_batches,
527 codec,
528 proto_converter,
529 );
530 }
531
532 if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>()
533 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
534 data_source_exec,
535 codec,
536 proto_converter,
537 )?
538 {
539 return Ok(node);
540 }
541
542 if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
543 return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
544 exec,
545 codec,
546 proto_converter,
547 );
548 }
549
550 if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
551 return protobuf::PhysicalPlanNode::try_from_repartition_exec(
552 exec,
553 codec,
554 proto_converter,
555 );
556 }
557
558 if let Some(exec) = plan.downcast_ref::<SortExec>() {
559 return protobuf::PhysicalPlanNode::try_from_sort_exec(
560 exec,
561 codec,
562 proto_converter,
563 );
564 }
565
566 if let Some(union) = plan.downcast_ref::<UnionExec>() {
567 return protobuf::PhysicalPlanNode::try_from_union_exec(
568 union,
569 codec,
570 proto_converter,
571 );
572 }
573
574 if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
575 return protobuf::PhysicalPlanNode::try_from_interleave_exec(
576 interleave,
577 codec,
578 proto_converter,
579 );
580 }
581
582 if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
583 return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
584 exec,
585 codec,
586 proto_converter,
587 );
588 }
589
590 if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
591 return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
592 exec,
593 codec,
594 proto_converter,
595 );
596 }
597
598 if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
599 return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
600 exec,
601 codec,
602 proto_converter,
603 );
604 }
605
606 if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
607 return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
608 exec,
609 codec,
610 proto_converter,
611 );
612 }
613
614 if let Some(exec) = plan.downcast_ref::<DataSinkExec>()
615 && let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
616 exec,
617 codec,
618 proto_converter,
619 )?
620 {
621 return Ok(node);
622 }
623
624 if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
625 return protobuf::PhysicalPlanNode::try_from_unnest_exec(
626 exec,
627 codec,
628 proto_converter,
629 );
630 }
631
632 if let Some(exec) = plan.downcast_ref::<CooperativeExec>() {
633 return protobuf::PhysicalPlanNode::try_from_cooperative_exec(
634 exec,
635 codec,
636 proto_converter,
637 );
638 }
639
640 if let Some(exec) = plan.downcast_ref::<LazyMemoryExec>()
641 && let Some(node) =
642 protobuf::PhysicalPlanNode::try_from_lazy_memory_exec(exec)?
643 {
644 return Ok(node);
645 }
646
647 if let Some(exec) = plan.downcast_ref::<AsyncFuncExec>() {
648 return protobuf::PhysicalPlanNode::try_from_async_func_exec(
649 exec,
650 codec,
651 proto_converter,
652 );
653 }
654
655 if let Some(exec) = plan.downcast_ref::<BufferExec>() {
656 return protobuf::PhysicalPlanNode::try_from_buffer_exec(
657 exec,
658 codec,
659 proto_converter,
660 );
661 }
662
663 if let Some(exec) = plan.downcast_ref::<ScalarSubqueryExec>() {
664 return protobuf::PhysicalPlanNode::try_from_scalar_subquery_exec(
665 exec,
666 codec,
667 proto_converter,
668 );
669 }
670
671 let mut buf: Vec<u8> = vec![];
672 match codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
673 Ok(_) => {
674 let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
675 .children()
676 .into_iter()
677 .cloned()
678 .map(|i| {
679 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
680 i,
681 codec,
682 proto_converter,
683 )
684 })
685 .collect::<Result<_>>()?;
686
687 Ok(protobuf::PhysicalPlanNode {
688 physical_plan_type: Some(PhysicalPlanType::Extension(
689 protobuf::PhysicalExtensionNode { node: buf, inputs },
690 )),
691 })
692 }
693 Err(e) => internal_err!(
694 "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
695 ),
696 }
697 }
698}
699
700impl protobuf::PhysicalPlanNode {
701 fn try_into_explain_physical_plan(
702 &self,
703 explain: &protobuf::ExplainExecNode,
704 _ctx: &PhysicalPlanDecodeContext<'_>,
705 _proto_converter: &dyn PhysicalProtoConverterExtension,
706 ) -> Result<Arc<dyn ExecutionPlan>> {
707 Ok(Arc::new(ExplainExec::new(
708 Arc::new(explain.schema.as_ref().unwrap().try_into()?),
709 explain
710 .stringified_plans
711 .iter()
712 .map(|plan| plan.into())
713 .collect(),
714 explain.verbose,
715 )))
716 }
717
718 fn try_into_projection_physical_plan(
719 &self,
720 projection: &protobuf::ProjectionExecNode,
721 ctx: &PhysicalPlanDecodeContext<'_>,
722 proto_converter: &dyn PhysicalProtoConverterExtension,
723 ) -> Result<Arc<dyn ExecutionPlan>> {
724 let input: Arc<dyn ExecutionPlan> =
725 into_physical_plan(&projection.input, ctx, proto_converter)?;
726 let exprs = projection
727 .expr
728 .iter()
729 .zip(projection.expr_name.iter())
730 .map(|(expr, name)| {
731 Ok((
732 proto_converter.proto_to_physical_expr(
733 expr,
734 input.schema().as_ref(),
735 ctx,
736 )?,
737 name.to_string(),
738 ))
739 })
740 .collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
741 let proj_exprs: Vec<ProjectionExpr> = exprs
742 .into_iter()
743 .map(|(expr, alias)| ProjectionExpr { expr, alias })
744 .collect();
745 Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input)?))
746 }
747
748 fn try_into_filter_physical_plan(
749 &self,
750 filter: &protobuf::FilterExecNode,
751 ctx: &PhysicalPlanDecodeContext<'_>,
752 proto_converter: &dyn PhysicalProtoConverterExtension,
753 ) -> Result<Arc<dyn ExecutionPlan>> {
754 let input: Arc<dyn ExecutionPlan> =
755 into_physical_plan(&filter.input, ctx, proto_converter)?;
756
757 let predicate = filter
758 .expr
759 .as_ref()
760 .map(|expr| {
761 proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
762 })
763 .transpose()?
764 .ok_or_else(|| {
765 internal_datafusion_err!(
766 "filter (FilterExecNode) in PhysicalPlanNode is missing."
767 )
768 })?;
769
770 let filter_selectivity = filter.default_filter_selectivity.try_into();
771 let num_fields = input.schema().fields().len();
776 let mut is_full_projection = filter.projection.len() == num_fields;
777 let mut projection_vec: Vec<usize> = Vec::with_capacity(filter.projection.len());
778 for (i, idx) in filter.projection.iter().enumerate() {
779 let idx = *idx as usize;
780 is_full_projection &= idx == i;
781 projection_vec.push(idx);
782 }
783 let projection = if is_full_projection {
784 None
785 } else {
786 Some(projection_vec)
787 };
788 let filter = FilterExecBuilder::new(predicate, input)
789 .apply_projection(projection)?
790 .with_batch_size(filter.batch_size as usize)
791 .with_fetch(filter.fetch.map(|f| f as usize))
792 .build()?;
793 match filter_selectivity {
794 Ok(filter_selectivity) => Ok(Arc::new(
795 filter.with_default_selectivity(filter_selectivity)?,
796 )),
797 Err(_) => Err(internal_datafusion_err!(
798 "filter_selectivity in PhysicalPlanNode is invalid "
799 )),
800 }
801 }
802
803 fn try_into_csv_scan_physical_plan(
804 &self,
805 scan: &protobuf::CsvScanExecNode,
806 ctx: &PhysicalPlanDecodeContext<'_>,
807 proto_converter: &dyn PhysicalProtoConverterExtension,
808 ) -> Result<Arc<dyn ExecutionPlan>> {
809 let escape =
810 if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(escape)) =
811 &scan.optional_escape
812 {
813 Some(str_to_byte(escape, "escape")?)
814 } else {
815 None
816 };
817
818 let comment = if let Some(
819 protobuf::csv_scan_exec_node::OptionalComment::Comment(comment),
820 ) = &scan.optional_comment
821 {
822 Some(str_to_byte(comment, "comment")?)
823 } else {
824 None
825 };
826
827 let table_schema =
829 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
830
831 let csv_options = CsvOptions {
832 has_header: Some(scan.has_header),
833 delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
834 quote: str_to_byte(&scan.quote, "quote")?,
835 newlines_in_values: Some(scan.newlines_in_values),
836 ..Default::default()
837 };
838 let source = Arc::new(
839 CsvSource::new(table_schema)
840 .with_csv_options(csv_options)
841 .with_escape(escape)
842 .with_comment(comment),
843 );
844
845 let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
846 scan.base_conf.as_ref().unwrap(),
847 ctx,
848 proto_converter,
849 source,
850 )?)
851 .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
852 .build();
853 Ok(DataSourceExec::from_data_source(conf))
854 }
855
856 fn try_into_json_scan_physical_plan(
857 &self,
858 scan: &protobuf::JsonScanExecNode,
859 ctx: &PhysicalPlanDecodeContext<'_>,
860 proto_converter: &dyn PhysicalProtoConverterExtension,
861 ) -> Result<Arc<dyn ExecutionPlan>> {
862 let base_conf = scan.base_conf.as_ref().unwrap();
863 let table_schema = parse_table_schema_from_proto(base_conf)?;
864 let scan_conf = parse_protobuf_file_scan_config(
865 base_conf,
866 ctx,
867 proto_converter,
868 Arc::new(JsonSource::new(table_schema)),
869 )?;
870 Ok(DataSourceExec::from_data_source(scan_conf))
871 }
872
873 fn try_into_arrow_scan_physical_plan(
874 &self,
875 scan: &protobuf::ArrowScanExecNode,
876 ctx: &PhysicalPlanDecodeContext<'_>,
877 proto_converter: &dyn PhysicalProtoConverterExtension,
878 ) -> Result<Arc<dyn ExecutionPlan>> {
879 let base_conf = scan.base_conf.as_ref().ok_or_else(|| {
880 internal_datafusion_err!("base_conf in ArrowScanExecNode is missing.")
881 })?;
882 let table_schema = parse_table_schema_from_proto(base_conf)?;
883 let scan_conf = parse_protobuf_file_scan_config(
884 base_conf,
885 ctx,
886 proto_converter,
887 Arc::new(ArrowSource::new_file_source(table_schema)),
888 )?;
889 Ok(DataSourceExec::from_data_source(scan_conf))
890 }
891
892 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
893 fn try_into_parquet_scan_physical_plan(
894 &self,
895 scan: &protobuf::ParquetScanExecNode,
896 ctx: &PhysicalPlanDecodeContext<'_>,
897 proto_converter: &dyn PhysicalProtoConverterExtension,
898 ) -> Result<Arc<dyn ExecutionPlan>> {
899 #[cfg(feature = "parquet")]
900 {
901 let schema = from_proto::parse_protobuf_file_scan_schema(
902 scan.base_conf.as_ref().unwrap(),
903 )?;
904
905 let base_conf = scan.base_conf.as_ref().unwrap();
907 let predicate_schema = if !base_conf.projection.is_empty() {
908 let projected_fields: Vec<_> = base_conf
910 .projection
911 .iter()
912 .map(|&i| schema.field(i as usize).clone())
913 .collect();
914 Arc::new(Schema::new(projected_fields))
915 } else {
916 schema
917 };
918
919 let predicate = scan
920 .predicate
921 .as_ref()
922 .map(|expr| {
923 proto_converter.proto_to_physical_expr(
924 expr,
925 predicate_schema.as_ref(),
926 ctx,
927 )
928 })
929 .transpose()?;
930 let mut options = datafusion_common::config::TableParquetOptions::default();
931
932 if let Some(table_options) = scan.parquet_options.as_ref() {
933 options = table_options.try_into()?;
934 }
935
936 let table_schema = parse_table_schema_from_proto(base_conf)?;
938 let object_store_url = match base_conf.object_store_url.is_empty() {
939 false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
940 true => ObjectStoreUrl::local_filesystem(),
941 };
942 let store = ctx
943 .task_ctx()
944 .runtime_env()
945 .object_store(object_store_url)?;
946 let metadata_cache = ctx
947 .task_ctx()
948 .runtime_env()
949 .cache_manager
950 .get_file_metadata_cache();
951 let reader_factory =
952 Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
953
954 let mut source = ParquetSource::new(table_schema)
955 .with_parquet_file_reader_factory(reader_factory)
956 .with_table_parquet_options(options);
957
958 if let Some(predicate) = predicate {
959 source = source.with_predicate(predicate);
960 }
961 let base_config = parse_protobuf_file_scan_config(
962 base_conf,
963 ctx,
964 proto_converter,
965 Arc::new(source),
966 )?;
967 Ok(DataSourceExec::from_data_source(base_config))
968 }
969 #[cfg(not(feature = "parquet"))]
970 panic!(
971 "Unable to process a Parquet PhysicalPlan when `parquet` feature is not enabled"
972 )
973 }
974
975 #[cfg_attr(not(feature = "avro"), expect(unused_variables))]
976 fn try_into_avro_scan_physical_plan(
977 &self,
978 scan: &protobuf::AvroScanExecNode,
979 ctx: &PhysicalPlanDecodeContext<'_>,
980 proto_converter: &dyn PhysicalProtoConverterExtension,
981 ) -> Result<Arc<dyn ExecutionPlan>> {
982 #[cfg(feature = "avro")]
983 {
984 let table_schema =
985 parse_table_schema_from_proto(scan.base_conf.as_ref().unwrap())?;
986 let conf = parse_protobuf_file_scan_config(
987 scan.base_conf.as_ref().unwrap(),
988 ctx,
989 proto_converter,
990 Arc::new(AvroSource::new(table_schema)),
991 )?;
992 Ok(DataSourceExec::from_data_source(conf))
993 }
994
995 #[cfg(not(feature = "avro"))]
996 panic!("Unable to process a Avro PhysicalPlan when `avro` feature is not enabled")
997 }
998
999 fn try_into_memory_scan_physical_plan(
1000 &self,
1001 scan: &protobuf::MemoryScanExecNode,
1002 ctx: &PhysicalPlanDecodeContext<'_>,
1003 proto_converter: &dyn PhysicalProtoConverterExtension,
1004 ) -> Result<Arc<dyn ExecutionPlan>> {
1005 let partitions = scan
1006 .partitions
1007 .iter()
1008 .map(|p| parse_record_batches(p))
1009 .collect::<Result<Vec<_>>>()?;
1010
1011 let proto_schema = scan.schema.as_ref().ok_or_else(|| {
1012 internal_datafusion_err!("schema in MemoryScanExecNode is missing.")
1013 })?;
1014 let schema: SchemaRef = SchemaRef::new(proto_schema.try_into()?);
1015
1016 let projection = if !scan.projection.is_empty() {
1017 Some(
1018 scan.projection
1019 .iter()
1020 .map(|i| *i as usize)
1021 .collect::<Vec<_>>(),
1022 )
1023 } else {
1024 None
1025 };
1026
1027 let mut sort_information = vec![];
1028 for ordering in &scan.sort_information {
1029 let sort_exprs = parse_physical_sort_exprs(
1030 &ordering.physical_sort_expr_nodes,
1031 ctx,
1032 &schema,
1033 proto_converter,
1034 )?;
1035 sort_information.extend(LexOrdering::new(sort_exprs));
1036 }
1037
1038 let source = MemorySourceConfig::try_new(&partitions, schema, projection)?
1039 .with_limit(scan.fetch.map(|f| f as usize))
1040 .with_show_sizes(scan.show_sizes);
1041
1042 let source = source.try_with_sort_information(sort_information)?;
1043
1044 Ok(DataSourceExec::from_data_source(source))
1045 }
1046
1047 fn try_into_coalesce_batches_physical_plan(
1048 &self,
1049 coalesce_batches: &protobuf::CoalesceBatchesExecNode,
1050 ctx: &PhysicalPlanDecodeContext<'_>,
1051 proto_converter: &dyn PhysicalProtoConverterExtension,
1052 ) -> Result<Arc<dyn ExecutionPlan>> {
1053 let input: Arc<dyn ExecutionPlan> =
1054 into_physical_plan(&coalesce_batches.input, ctx, proto_converter)?;
1055 Ok(Arc::new(
1056 #[expect(deprecated)]
1057 CoalesceBatchesExec::new(input, coalesce_batches.target_batch_size as usize)
1058 .with_fetch(coalesce_batches.fetch.map(|f| f as usize)),
1059 ))
1060 }
1061
1062 fn try_into_merge_physical_plan(
1063 &self,
1064 merge: &protobuf::CoalescePartitionsExecNode,
1065 ctx: &PhysicalPlanDecodeContext<'_>,
1066 proto_converter: &dyn PhysicalProtoConverterExtension,
1067 ) -> Result<Arc<dyn ExecutionPlan>> {
1068 let input: Arc<dyn ExecutionPlan> =
1069 into_physical_plan(&merge.input, ctx, proto_converter)?;
1070 Ok(Arc::new(
1071 CoalescePartitionsExec::new(input)
1072 .with_fetch(merge.fetch.map(|f| f as usize)),
1073 ))
1074 }
1075
1076 fn try_into_repartition_physical_plan(
1077 &self,
1078 repart: &protobuf::RepartitionExecNode,
1079 ctx: &PhysicalPlanDecodeContext<'_>,
1080 proto_converter: &dyn PhysicalProtoConverterExtension,
1081 ) -> Result<Arc<dyn ExecutionPlan>> {
1082 let input: Arc<dyn ExecutionPlan> =
1083 into_physical_plan(&repart.input, ctx, proto_converter)?;
1084 let partitioning = parse_protobuf_partitioning(
1085 repart.partitioning.as_ref(),
1086 ctx,
1087 input.schema().as_ref(),
1088 proto_converter,
1089 )?;
1090 let mut repart_exec = RepartitionExec::try_new(input, partitioning.unwrap())?;
1091 if repart.preserve_order {
1092 repart_exec = repart_exec.with_preserve_order();
1093 }
1094 Ok(Arc::new(repart_exec))
1095 }
1096
1097 fn try_into_global_limit_physical_plan(
1098 &self,
1099 limit: &protobuf::GlobalLimitExecNode,
1100 ctx: &PhysicalPlanDecodeContext<'_>,
1101 proto_converter: &dyn PhysicalProtoConverterExtension,
1102 ) -> Result<Arc<dyn ExecutionPlan>> {
1103 let input: Arc<dyn ExecutionPlan> =
1104 into_physical_plan(&limit.input, ctx, proto_converter)?;
1105 let fetch = if limit.fetch >= 0 {
1106 Some(limit.fetch as usize)
1107 } else {
1108 None
1109 };
1110 Ok(Arc::new(GlobalLimitExec::new(
1111 input,
1112 limit.skip as usize,
1113 fetch,
1114 )))
1115 }
1116
1117 fn try_into_local_limit_physical_plan(
1118 &self,
1119 limit: &protobuf::LocalLimitExecNode,
1120 ctx: &PhysicalPlanDecodeContext<'_>,
1121 proto_converter: &dyn PhysicalProtoConverterExtension,
1122 ) -> Result<Arc<dyn ExecutionPlan>> {
1123 let input: Arc<dyn ExecutionPlan> =
1124 into_physical_plan(&limit.input, ctx, proto_converter)?;
1125 Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
1126 }
1127
1128 fn try_into_window_physical_plan(
1129 &self,
1130 window_agg: &protobuf::WindowAggExecNode,
1131 ctx: &PhysicalPlanDecodeContext<'_>,
1132 proto_converter: &dyn PhysicalProtoConverterExtension,
1133 ) -> Result<Arc<dyn ExecutionPlan>> {
1134 let input: Arc<dyn ExecutionPlan> =
1135 into_physical_plan(&window_agg.input, ctx, proto_converter)?;
1136 let input_schema = input.schema();
1137
1138 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
1139 .window_expr
1140 .iter()
1141 .map(|window_expr| {
1142 parse_physical_window_expr(
1143 window_expr,
1144 ctx,
1145 input_schema.as_ref(),
1146 proto_converter,
1147 )
1148 })
1149 .collect::<Result<Vec<_>, _>>()?;
1150
1151 let partition_keys = window_agg
1152 .partition_keys
1153 .iter()
1154 .map(|expr| {
1155 proto_converter.proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1156 })
1157 .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
1158
1159 if let Some(input_order_mode) = window_agg.input_order_mode.as_ref() {
1160 let input_order_mode = match input_order_mode {
1161 window_agg_exec_node::InputOrderMode::Linear(_) => InputOrderMode::Linear,
1162 window_agg_exec_node::InputOrderMode::PartiallySorted(
1163 protobuf::PartiallySortedInputOrderMode { columns },
1164 ) => InputOrderMode::PartiallySorted(
1165 columns.iter().map(|c| *c as usize).collect(),
1166 ),
1167 window_agg_exec_node::InputOrderMode::Sorted(_) => InputOrderMode::Sorted,
1168 };
1169
1170 Ok(Arc::new(BoundedWindowAggExec::try_new(
1171 physical_window_expr,
1172 input,
1173 input_order_mode,
1174 !partition_keys.is_empty(),
1175 )?))
1176 } else {
1177 Ok(Arc::new(WindowAggExec::try_new(
1178 physical_window_expr,
1179 input,
1180 !partition_keys.is_empty(),
1181 )?))
1182 }
1183 }
1184
1185 fn try_into_aggregate_physical_plan(
1186 &self,
1187 hash_agg: &protobuf::AggregateExecNode,
1188 ctx: &PhysicalPlanDecodeContext<'_>,
1189 proto_converter: &dyn PhysicalProtoConverterExtension,
1190 ) -> Result<Arc<dyn ExecutionPlan>> {
1191 let input: Arc<dyn ExecutionPlan> =
1192 into_physical_plan(&hash_agg.input, ctx, proto_converter)?;
1193 let mode = protobuf::AggregateMode::try_from(hash_agg.mode).map_err(|_| {
1194 proto_error(format!(
1195 "Received a AggregateNode message with unknown AggregateMode {}",
1196 hash_agg.mode
1197 ))
1198 })?;
1199 let agg_mode: AggregateMode = match mode {
1200 protobuf::AggregateMode::Partial => AggregateMode::Partial,
1201 protobuf::AggregateMode::Final => AggregateMode::Final,
1202 protobuf::AggregateMode::FinalPartitioned => AggregateMode::FinalPartitioned,
1203 protobuf::AggregateMode::Single => AggregateMode::Single,
1204 protobuf::AggregateMode::SinglePartitioned => {
1205 AggregateMode::SinglePartitioned
1206 }
1207 protobuf::AggregateMode::PartialReduce => AggregateMode::PartialReduce,
1208 };
1209
1210 let num_expr = hash_agg.group_expr.len();
1211
1212 let group_expr = hash_agg
1213 .group_expr
1214 .iter()
1215 .zip(hash_agg.group_expr_name.iter())
1216 .map(|(expr, name)| {
1217 proto_converter
1218 .proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1219 .map(|expr| (expr, name.to_string()))
1220 })
1221 .collect::<Result<Vec<_>, _>>()?;
1222
1223 let null_expr = hash_agg
1224 .null_expr
1225 .iter()
1226 .zip(hash_agg.group_expr_name.iter())
1227 .map(|(expr, name)| {
1228 proto_converter
1229 .proto_to_physical_expr(expr, input.schema().as_ref(), ctx)
1230 .map(|expr| (expr, name.to_string()))
1231 })
1232 .collect::<Result<Vec<_>, _>>()?;
1233
1234 let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
1235 hash_agg
1236 .groups
1237 .chunks(num_expr)
1238 .map(|g| g.to_vec())
1239 .collect::<Vec<Vec<bool>>>()
1240 } else {
1241 vec![]
1242 };
1243
1244 let has_grouping_set = hash_agg.has_grouping_set;
1245
1246 let input_schema = hash_agg.input_schema.as_ref().ok_or_else(|| {
1247 internal_datafusion_err!("input_schema in AggregateNode is missing.")
1248 })?;
1249 let physical_schema: SchemaRef = SchemaRef::new(input_schema.try_into()?);
1250
1251 let physical_filter_expr = hash_agg
1252 .filter_expr
1253 .iter()
1254 .map(|expr| {
1255 expr.expr
1256 .as_ref()
1257 .map(|e| {
1258 proto_converter.proto_to_physical_expr(e, &physical_schema, ctx)
1259 })
1260 .transpose()
1261 })
1262 .collect::<Result<Vec<_>, _>>()?;
1263
1264 let physical_aggr_expr: Vec<Arc<AggregateFunctionExpr>> = hash_agg
1265 .aggr_expr
1266 .iter()
1267 .zip(hash_agg.aggr_expr_name.iter())
1268 .map(|(expr, name)| {
1269 let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
1270 proto_error("Unexpected empty aggregate physical expression")
1271 })?;
1272
1273 match expr_type {
1274 ExprType::AggregateExpr(agg_node) => {
1275 let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node
1276 .expr
1277 .iter()
1278 .map(|e| {
1279 proto_converter.proto_to_physical_expr(
1280 e,
1281 &physical_schema,
1282 ctx,
1283 )
1284 })
1285 .collect::<Result<Vec<_>>>()?;
1286 let order_bys = agg_node
1287 .ordering_req
1288 .iter()
1289 .map(|e| {
1290 parse_physical_sort_expr(
1291 e,
1292 ctx,
1293 &physical_schema,
1294 proto_converter,
1295 )
1296 })
1297 .collect::<Result<_>>()?;
1298 agg_node
1299 .aggregate_function
1300 .as_ref()
1301 .map(|func| match func {
1302 AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
1303 let agg_udf = match &agg_node.fun_definition {
1304 Some(buf) => {
1305 ctx.codec().try_decode_udaf(udaf_name, buf)?
1306 }
1307 None => ctx.task_ctx().udaf(udaf_name).or_else(
1308 |_| {
1309 ctx.codec()
1310 .try_decode_udaf(udaf_name, &[])
1311 },
1312 )?,
1313 };
1314
1315 let (human_display, human_display_alias) =
1316 split_human_display_alias(
1317 &agg_node.human_display,
1318 name,
1319 );
1320 let builder = AggregateExprBuilder::new(
1321 agg_udf,
1322 input_phy_expr,
1323 )
1324 .schema(Arc::clone(&physical_schema))
1325 .alias(name)
1326 .with_ignore_nulls(agg_node.ignore_nulls)
1327 .with_distinct(agg_node.distinct)
1328 .order_by(order_bys)
1329 .human_display(human_display);
1330 let builder = if let Some(alias) = human_display_alias
1331 {
1332 builder.human_display_alias(alias)
1333 } else {
1334 builder
1335 };
1336 builder.build().map(Arc::new)
1337 }
1338 })
1339 .transpose()?
1340 .ok_or_else(|| {
1341 proto_error(
1342 "Invalid AggregateExpr, missing aggregate_function",
1343 )
1344 })
1345 }
1346 _ => internal_err!("Invalid aggregate expression for AggregateExec"),
1347 }
1348 })
1349 .collect::<Result<Vec<_>, _>>()?;
1350
1351 let physical_schema_ref = Arc::clone(&physical_schema);
1352 let agg = AggregateExec::try_new(
1353 agg_mode,
1354 PhysicalGroupBy::new(group_expr, null_expr, groups, has_grouping_set),
1355 physical_aggr_expr,
1356 physical_filter_expr,
1357 input,
1358 physical_schema,
1359 )?;
1360
1361 let agg = if let Some(limit_proto) = &hash_agg.limit {
1362 let limit = limit_proto.limit as usize;
1363 let limit_options = match limit_proto.descending {
1364 Some(descending) => LimitOptions::new_with_order(limit, descending),
1365 None => LimitOptions::new(limit),
1366 };
1367 agg.with_limit_options(Some(limit_options))
1368 } else {
1369 agg
1370 };
1371
1372 let agg = if let Some(dynamic_filter_proto) = &hash_agg.dynamic_filter {
1373 let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1374 dynamic_filter_proto,
1375 physical_schema_ref.as_ref(),
1376 ctx,
1377 )?;
1378 let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1379 .downcast::<DynamicFilterPhysicalExpr>()
1380 .map_err(|_| {
1381 internal_datafusion_err!(
1382 "AggregateExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1383 )
1384 })?;
1385 agg.with_dynamic_filter_expr(df)?
1386 } else {
1387 agg
1388 };
1389
1390 Ok(Arc::new(agg))
1391 }
1392
1393 fn try_into_hash_join_physical_plan(
1394 &self,
1395 hashjoin: &protobuf::HashJoinExecNode,
1396 ctx: &PhysicalPlanDecodeContext<'_>,
1397 proto_converter: &dyn PhysicalProtoConverterExtension,
1398 ) -> Result<Arc<dyn ExecutionPlan>> {
1399 let left: Arc<dyn ExecutionPlan> =
1400 into_physical_plan(&hashjoin.left, ctx, proto_converter)?;
1401 let right: Arc<dyn ExecutionPlan> =
1402 into_physical_plan(&hashjoin.right, ctx, proto_converter)?;
1403 let left_schema = left.schema();
1404 let right_schema = right.schema();
1405 let on: Vec<(PhysicalExprRef, PhysicalExprRef)> = hashjoin
1406 .on
1407 .iter()
1408 .map(|col| {
1409 let left = proto_converter.proto_to_physical_expr(
1410 &col.left.clone().unwrap(),
1411 left_schema.as_ref(),
1412 ctx,
1413 )?;
1414 let right = proto_converter.proto_to_physical_expr(
1415 &col.right.clone().unwrap(),
1416 right_schema.as_ref(),
1417 ctx,
1418 )?;
1419 Ok((left, right))
1420 })
1421 .collect::<Result<_>>()?;
1422 let join_type =
1423 protobuf::JoinType::try_from(hashjoin.join_type).map_err(|_| {
1424 proto_error(format!(
1425 "Received a HashJoinNode message with unknown JoinType {}",
1426 hashjoin.join_type
1427 ))
1428 })?;
1429 let null_equality = protobuf::NullEquality::try_from(hashjoin.null_equality)
1430 .map_err(|_| {
1431 proto_error(format!(
1432 "Received a HashJoinNode message with unknown NullEquality {}",
1433 hashjoin.null_equality
1434 ))
1435 })?;
1436 let filter = hashjoin
1437 .filter
1438 .as_ref()
1439 .map(|f| {
1440 let schema = f
1441 .schema
1442 .as_ref()
1443 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1444 .try_into()?;
1445
1446 let expression = proto_converter.proto_to_physical_expr(
1447 f.expression.as_ref().ok_or_else(|| {
1448 proto_error("Unexpected empty filter expression")
1449 })?,
1450 &schema,
1451 ctx,
1452 )?;
1453 let column_indices = f.column_indices
1454 .iter()
1455 .map(|i| {
1456 let side = protobuf::JoinSide::try_from(i.side)
1457 .map_err(|_| proto_error(format!(
1458 "Received a HashJoinNode message with JoinSide in Filter {}",
1459 i.side))
1460 )?;
1461
1462 Ok(ColumnIndex {
1463 index: i.index as usize,
1464 side: side.into(),
1465 })
1466 })
1467 .collect::<Result<Vec<_>>>()?;
1468
1469 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1470 })
1471 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1472
1473 let partition_mode = protobuf::PartitionMode::try_from(hashjoin.partition_mode)
1474 .map_err(|_| {
1475 proto_error(format!(
1476 "Received a HashJoinNode message with unknown PartitionMode {}",
1477 hashjoin.partition_mode
1478 ))
1479 })?;
1480 let partition_mode = match partition_mode {
1481 protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
1482 protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
1483 protobuf::PartitionMode::Auto => PartitionMode::Auto,
1484 };
1485 let projection = if !hashjoin.projection.is_empty() {
1486 Some(
1487 hashjoin
1488 .projection
1489 .iter()
1490 .map(|i| *i as usize)
1491 .collect::<Vec<_>>(),
1492 )
1493 } else {
1494 None
1495 };
1496 let mut hash_join = HashJoinExec::try_new(
1497 left,
1498 right,
1499 on,
1500 filter,
1501 &join_type.into(),
1502 projection,
1503 partition_mode,
1504 null_equality.into(),
1505 hashjoin.null_aware,
1506 )?;
1507
1508 if let Some(dynamic_filter_proto) = &hashjoin.dynamic_filter {
1509 let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1510 dynamic_filter_proto,
1511 right_schema.as_ref(),
1512 ctx,
1513 )?;
1514 let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1515 .downcast::<DynamicFilterPhysicalExpr>()
1516 .map_err(|_| {
1517 internal_datafusion_err!(
1518 "HashJoinExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1519 )
1520 })?;
1521 hash_join = hash_join.with_dynamic_filter_expr(df)?;
1522 }
1523
1524 Ok(Arc::new(hash_join))
1525 }
1526
1527 fn try_into_symmetric_hash_join_physical_plan(
1528 &self,
1529 sym_join: &protobuf::SymmetricHashJoinExecNode,
1530 ctx: &PhysicalPlanDecodeContext<'_>,
1531 proto_converter: &dyn PhysicalProtoConverterExtension,
1532 ) -> Result<Arc<dyn ExecutionPlan>> {
1533 let left = into_physical_plan(&sym_join.left, ctx, proto_converter)?;
1534 let right = into_physical_plan(&sym_join.right, ctx, proto_converter)?;
1535 let left_schema = left.schema();
1536 let right_schema = right.schema();
1537 let on = sym_join
1538 .on
1539 .iter()
1540 .map(|col| {
1541 let left = proto_converter.proto_to_physical_expr(
1542 &col.left.clone().unwrap(),
1543 left_schema.as_ref(),
1544 ctx,
1545 )?;
1546 let right = proto_converter.proto_to_physical_expr(
1547 &col.right.clone().unwrap(),
1548 right_schema.as_ref(),
1549 ctx,
1550 )?;
1551 Ok((left, right))
1552 })
1553 .collect::<Result<_>>()?;
1554 let join_type =
1555 protobuf::JoinType::try_from(sym_join.join_type).map_err(|_| {
1556 proto_error(format!(
1557 "Received a SymmetricHashJoin message with unknown JoinType {}",
1558 sym_join.join_type
1559 ))
1560 })?;
1561 let null_equality = protobuf::NullEquality::try_from(sym_join.null_equality)
1562 .map_err(|_| {
1563 proto_error(format!(
1564 "Received a SymmetricHashJoin message with unknown NullEquality {}",
1565 sym_join.null_equality
1566 ))
1567 })?;
1568 let filter = sym_join
1569 .filter
1570 .as_ref()
1571 .map(|f| {
1572 let schema = f
1573 .schema
1574 .as_ref()
1575 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1576 .try_into()?;
1577
1578 let expression = proto_converter.proto_to_physical_expr(
1579 f.expression.as_ref().ok_or_else(|| {
1580 proto_error("Unexpected empty filter expression")
1581 })?,
1582 &schema,
1583 ctx,
1584 )?;
1585 let column_indices = f.column_indices
1586 .iter()
1587 .map(|i| {
1588 let side = protobuf::JoinSide::try_from(i.side)
1589 .map_err(|_| proto_error(format!(
1590 "Received a HashJoinNode message with JoinSide in Filter {}",
1591 i.side))
1592 )?;
1593
1594 Ok(ColumnIndex {
1595 index: i.index as usize,
1596 side: side.into(),
1597 })
1598 })
1599 .collect::<Result<_>>()?;
1600
1601 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1602 })
1603 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1604
1605 let left_sort_exprs = parse_physical_sort_exprs(
1606 &sym_join.left_sort_exprs,
1607 ctx,
1608 &left_schema,
1609 proto_converter,
1610 )?;
1611 let left_sort_exprs = LexOrdering::new(left_sort_exprs);
1612
1613 let right_sort_exprs = parse_physical_sort_exprs(
1614 &sym_join.right_sort_exprs,
1615 ctx,
1616 &right_schema,
1617 proto_converter,
1618 )?;
1619 let right_sort_exprs = LexOrdering::new(right_sort_exprs);
1620
1621 let partition_mode = protobuf::StreamPartitionMode::try_from(
1622 sym_join.partition_mode,
1623 )
1624 .map_err(|_| {
1625 proto_error(format!(
1626 "Received a SymmetricHashJoin message with unknown PartitionMode {}",
1627 sym_join.partition_mode
1628 ))
1629 })?;
1630 let partition_mode = match partition_mode {
1631 protobuf::StreamPartitionMode::SinglePartition => {
1632 StreamJoinPartitionMode::SinglePartition
1633 }
1634 protobuf::StreamPartitionMode::PartitionedExec => {
1635 StreamJoinPartitionMode::Partitioned
1636 }
1637 };
1638 SymmetricHashJoinExec::try_new(
1639 left,
1640 right,
1641 on,
1642 filter,
1643 &join_type.into(),
1644 null_equality.into(),
1645 left_sort_exprs,
1646 right_sort_exprs,
1647 partition_mode,
1648 )
1649 .map(|e| Arc::new(e) as _)
1650 }
1651
1652 fn try_into_union_physical_plan(
1653 &self,
1654 union: &protobuf::UnionExecNode,
1655 ctx: &PhysicalPlanDecodeContext<'_>,
1656 proto_converter: &dyn PhysicalProtoConverterExtension,
1657 ) -> Result<Arc<dyn ExecutionPlan>> {
1658 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1659 for input in &union.inputs {
1660 inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?);
1661 }
1662 UnionExec::try_new(inputs)
1663 }
1664
1665 fn try_into_interleave_physical_plan(
1666 &self,
1667 interleave: &protobuf::InterleaveExecNode,
1668 ctx: &PhysicalPlanDecodeContext<'_>,
1669 proto_converter: &dyn PhysicalProtoConverterExtension,
1670 ) -> Result<Arc<dyn ExecutionPlan>> {
1671 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
1672 for input in &interleave.inputs {
1673 inputs.push(proto_converter.proto_to_execution_plan(input, ctx)?);
1674 }
1675 Ok(Arc::new(InterleaveExec::try_new(inputs)?))
1676 }
1677
1678 fn try_into_cross_join_physical_plan(
1679 &self,
1680 crossjoin: &protobuf::CrossJoinExecNode,
1681 ctx: &PhysicalPlanDecodeContext<'_>,
1682 proto_converter: &dyn PhysicalProtoConverterExtension,
1683 ) -> Result<Arc<dyn ExecutionPlan>> {
1684 let left: Arc<dyn ExecutionPlan> =
1685 into_physical_plan(&crossjoin.left, ctx, proto_converter)?;
1686 let right: Arc<dyn ExecutionPlan> =
1687 into_physical_plan(&crossjoin.right, ctx, proto_converter)?;
1688 Ok(Arc::new(CrossJoinExec::new(left, right)))
1689 }
1690
1691 fn try_into_empty_physical_plan(
1692 &self,
1693 empty: &protobuf::EmptyExecNode,
1694 _ctx: &PhysicalPlanDecodeContext<'_>,
1695 _proto_converter: &dyn PhysicalProtoConverterExtension,
1696 ) -> Result<Arc<dyn ExecutionPlan>> {
1697 let schema = Arc::new(convert_required!(empty.schema)?);
1698 Ok(Arc::new(EmptyExec::new(schema)))
1699 }
1700
1701 fn try_into_placeholder_row_physical_plan(
1702 &self,
1703 placeholder: &protobuf::PlaceholderRowExecNode,
1704 _ctx: &PhysicalPlanDecodeContext<'_>,
1705 ) -> Result<Arc<dyn ExecutionPlan>> {
1706 let schema = Arc::new(convert_required!(placeholder.schema)?);
1707 Ok(Arc::new(PlaceholderRowExec::new(schema)))
1708 }
1709
1710 fn try_into_sort_physical_plan(
1711 &self,
1712 sort: &protobuf::SortExecNode,
1713 ctx: &PhysicalPlanDecodeContext<'_>,
1714 proto_converter: &dyn PhysicalProtoConverterExtension,
1715 ) -> Result<Arc<dyn ExecutionPlan>> {
1716 let input = into_physical_plan(&sort.input, ctx, proto_converter)?;
1717 let exprs = sort
1718 .expr
1719 .iter()
1720 .map(|expr| {
1721 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1722 proto_error(format!(
1723 "physical_plan::from_proto() Unexpected expr {self:?}"
1724 ))
1725 })?;
1726 if let ExprType::Sort(sort_expr) = expr {
1727 let expr = sort_expr
1728 .expr
1729 .as_ref()
1730 .ok_or_else(|| {
1731 proto_error(format!(
1732 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1733 ))
1734 })?
1735 .as_ref();
1736 Ok(PhysicalSortExpr {
1737 expr: proto_converter.proto_to_physical_expr(
1738 expr,
1739 input.schema().as_ref(),
1740 ctx,
1741 )?,
1742 options: SortOptions {
1743 descending: !sort_expr.asc,
1744 nulls_first: sort_expr.nulls_first,
1745 },
1746 })
1747 } else {
1748 internal_err!(
1749 "physical_plan::from_proto() {self:?}"
1750 )
1751 }
1752 })
1753 .collect::<Result<Vec<_>>>()?;
1754 let Some(ordering) = LexOrdering::new(exprs) else {
1755 return internal_err!("SortExec requires an ordering");
1756 };
1757 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1758 let new_sort = SortExec::new(ordering, input)
1759 .with_fetch(fetch)
1760 .with_preserve_partitioning(sort.preserve_partitioning);
1761
1762 let new_sort = if let Some(dynamic_filter_proto) = &sort.dynamic_filter {
1763 let dynamic_filter_expr = proto_converter.proto_to_physical_expr(
1764 dynamic_filter_proto,
1765 new_sort.input().schema().as_ref(),
1766 ctx,
1767 )?;
1768 let df = (dynamic_filter_expr as Arc<dyn Any + Send + Sync>)
1769 .downcast::<DynamicFilterPhysicalExpr>()
1770 .map_err(|_| {
1771 internal_datafusion_err!(
1772 "SortExec dynamic_filter did not decode to a DynamicFilterPhysicalExpr"
1773 )
1774 })?;
1775 new_sort.with_dynamic_filter_expr(df)?
1776 } else {
1777 new_sort
1778 };
1779
1780 Ok(Arc::new(new_sort))
1781 }
1782
1783 fn try_into_sort_preserving_merge_physical_plan(
1784 &self,
1785 sort: &protobuf::SortPreservingMergeExecNode,
1786 ctx: &PhysicalPlanDecodeContext<'_>,
1787 proto_converter: &dyn PhysicalProtoConverterExtension,
1788 ) -> Result<Arc<dyn ExecutionPlan>> {
1789 let input = into_physical_plan(&sort.input, ctx, proto_converter)?;
1790 let exprs = sort
1791 .expr
1792 .iter()
1793 .map(|expr| {
1794 let expr = expr.expr_type.as_ref().ok_or_else(|| {
1795 proto_error(format!(
1796 "physical_plan::from_proto() Unexpected expr {self:?}"
1797 ))
1798 })?;
1799 if let ExprType::Sort(sort_expr) = expr {
1800 let expr = sort_expr
1801 .expr
1802 .as_ref()
1803 .ok_or_else(|| {
1804 proto_error(format!(
1805 "physical_plan::from_proto() Unexpected sort expr {self:?}"
1806 ))
1807 })?
1808 .as_ref();
1809 Ok(PhysicalSortExpr {
1810 expr: proto_converter.proto_to_physical_expr(
1811 expr,
1812 input.schema().as_ref(),
1813 ctx,
1814 )?,
1815 options: SortOptions {
1816 descending: !sort_expr.asc,
1817 nulls_first: sort_expr.nulls_first,
1818 },
1819 })
1820 } else {
1821 internal_err!("physical_plan::from_proto() {self:?}")
1822 }
1823 })
1824 .collect::<Result<Vec<_>>>()?;
1825 let Some(ordering) = LexOrdering::new(exprs) else {
1826 return internal_err!("SortExec requires an ordering");
1827 };
1828 let fetch = (sort.fetch >= 0).then_some(sort.fetch as _);
1829 Ok(Arc::new(
1830 SortPreservingMergeExec::new(ordering, input).with_fetch(fetch),
1831 ))
1832 }
1833
1834 fn try_into_extension_physical_plan(
1835 &self,
1836 extension: &protobuf::PhysicalExtensionNode,
1837 ctx: &PhysicalPlanDecodeContext<'_>,
1838 proto_converter: &dyn PhysicalProtoConverterExtension,
1839 ) -> Result<Arc<dyn ExecutionPlan>> {
1840 let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
1841 .inputs
1842 .iter()
1843 .map(|i| proto_converter.proto_to_execution_plan(i, ctx))
1844 .collect::<Result<_>>()?;
1845
1846 let extension_node =
1847 ctx.codec()
1848 .try_decode(extension.node.as_slice(), &inputs, ctx.task_ctx())?;
1849
1850 Ok(extension_node)
1851 }
1852
1853 fn try_into_nested_loop_join_physical_plan(
1854 &self,
1855 join: &protobuf::NestedLoopJoinExecNode,
1856 ctx: &PhysicalPlanDecodeContext<'_>,
1857 proto_converter: &dyn PhysicalProtoConverterExtension,
1858 ) -> Result<Arc<dyn ExecutionPlan>> {
1859 let left: Arc<dyn ExecutionPlan> =
1860 into_physical_plan(&join.left, ctx, proto_converter)?;
1861 let right: Arc<dyn ExecutionPlan> =
1862 into_physical_plan(&join.right, ctx, proto_converter)?;
1863 let join_type = protobuf::JoinType::try_from(join.join_type).map_err(|_| {
1864 proto_error(format!(
1865 "Received a NestedLoopJoinExecNode message with unknown JoinType {}",
1866 join.join_type
1867 ))
1868 })?;
1869 let filter = join
1870 .filter
1871 .as_ref()
1872 .map(|f| {
1873 let schema = f
1874 .schema
1875 .as_ref()
1876 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
1877 .try_into()?;
1878
1879 let expression = proto_converter
1880 .proto_to_physical_expr(
1881 f.expression.as_ref().ok_or_else(|| {
1882 proto_error("Unexpected empty filter expression")
1883 })?,
1884 &schema,
1885 ctx,
1886 )?;
1887 let column_indices = f.column_indices
1888 .iter()
1889 .map(|i| {
1890 let side = protobuf::JoinSide::try_from(i.side)
1891 .map_err(|_| proto_error(format!(
1892 "Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
1893 i.side))
1894 )?;
1895
1896 Ok(ColumnIndex {
1897 index: i.index as usize,
1898 side: side.into(),
1899 })
1900 })
1901 .collect::<Result<Vec<_>>>()?;
1902
1903 Ok(JoinFilter::new(expression, column_indices, Arc::new(schema)))
1904 })
1905 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
1906
1907 let projection = if !join.projection.is_empty() {
1908 Some(
1909 join.projection
1910 .iter()
1911 .map(|i| *i as usize)
1912 .collect::<Vec<_>>(),
1913 )
1914 } else {
1915 None
1916 };
1917
1918 Ok(Arc::new(NestedLoopJoinExec::try_new(
1919 left,
1920 right,
1921 filter,
1922 &join_type.into(),
1923 projection,
1924 )?))
1925 }
1926
1927 fn try_into_analyze_physical_plan(
1928 &self,
1929 analyze: &protobuf::AnalyzeExecNode,
1930 ctx: &PhysicalPlanDecodeContext<'_>,
1931 proto_converter: &dyn PhysicalProtoConverterExtension,
1932 ) -> Result<Arc<dyn ExecutionPlan>> {
1933 let input: Arc<dyn ExecutionPlan> =
1934 into_physical_plan(&analyze.input, ctx, proto_converter)?;
1935 let metric_categories = if analyze.has_metric_categories {
1936 let cats: Result<Vec<MetricCategory>> = analyze
1937 .metric_categories
1938 .iter()
1939 .map(|s| s.parse::<MetricCategory>())
1940 .collect();
1941 Some(cats?)
1942 } else {
1943 None
1944 };
1945 Ok(Arc::new(AnalyzeExec::new(
1946 analyze.verbose,
1947 analyze.show_statistics,
1948 vec![MetricType::Summary, MetricType::Dev],
1949 metric_categories,
1950 input,
1951 Arc::new(convert_required!(analyze.schema)?),
1952 )))
1953 }
1954
1955 fn try_into_json_sink_physical_plan(
1956 &self,
1957 sink: &protobuf::JsonSinkExecNode,
1958 ctx: &PhysicalPlanDecodeContext<'_>,
1959 proto_converter: &dyn PhysicalProtoConverterExtension,
1960 ) -> Result<Arc<dyn ExecutionPlan>> {
1961 let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
1962
1963 let data_sink: JsonSink = sink
1964 .sink
1965 .as_ref()
1966 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
1967 .try_into()?;
1968 let sink_schema = input.schema();
1969 let sort_order = sink
1970 .sort_order
1971 .as_ref()
1972 .map(|collection| {
1973 parse_physical_sort_exprs(
1974 &collection.physical_sort_expr_nodes,
1975 ctx,
1976 &sink_schema,
1977 proto_converter,
1978 )
1979 .map(|sort_exprs| {
1980 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
1981 })
1982 })
1983 .transpose()?
1984 .flatten();
1985 Ok(Arc::new(DataSinkExec::new(
1986 input,
1987 Arc::new(data_sink),
1988 sort_order,
1989 )))
1990 }
1991
1992 fn try_into_csv_sink_physical_plan(
1993 &self,
1994 sink: &protobuf::CsvSinkExecNode,
1995 ctx: &PhysicalPlanDecodeContext<'_>,
1996 proto_converter: &dyn PhysicalProtoConverterExtension,
1997 ) -> Result<Arc<dyn ExecutionPlan>> {
1998 let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
1999
2000 let data_sink: CsvSink = sink
2001 .sink
2002 .as_ref()
2003 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
2004 .try_into()?;
2005 let sink_schema = input.schema();
2006 let sort_order = sink
2007 .sort_order
2008 .as_ref()
2009 .map(|collection| {
2010 parse_physical_sort_exprs(
2011 &collection.physical_sort_expr_nodes,
2012 ctx,
2013 &sink_schema,
2014 proto_converter,
2015 )
2016 .map(|sort_exprs| {
2017 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
2018 })
2019 })
2020 .transpose()?
2021 .flatten();
2022 Ok(Arc::new(DataSinkExec::new(
2023 input,
2024 Arc::new(data_sink),
2025 sort_order,
2026 )))
2027 }
2028
2029 #[cfg_attr(not(feature = "parquet"), expect(unused_variables))]
2030 fn try_into_parquet_sink_physical_plan(
2031 &self,
2032 sink: &protobuf::ParquetSinkExecNode,
2033 ctx: &PhysicalPlanDecodeContext<'_>,
2034 proto_converter: &dyn PhysicalProtoConverterExtension,
2035 ) -> Result<Arc<dyn ExecutionPlan>> {
2036 #[cfg(feature = "parquet")]
2037 {
2038 let input = into_physical_plan(&sink.input, ctx, proto_converter)?;
2039
2040 let data_sink: ParquetSink = sink
2041 .sink
2042 .as_ref()
2043 .ok_or_else(|| proto_error("Missing required field in protobuf"))?
2044 .try_into()?;
2045 let sink_schema = input.schema();
2046 let sort_order = sink
2047 .sort_order
2048 .as_ref()
2049 .map(|collection| {
2050 parse_physical_sort_exprs(
2051 &collection.physical_sort_expr_nodes,
2052 ctx,
2053 &sink_schema,
2054 proto_converter,
2055 )
2056 .map(|sort_exprs| {
2057 LexRequirement::new(sort_exprs.into_iter().map(Into::into))
2058 })
2059 })
2060 .transpose()?
2061 .flatten();
2062 Ok(Arc::new(DataSinkExec::new(
2063 input,
2064 Arc::new(data_sink),
2065 sort_order,
2066 )))
2067 }
2068 #[cfg(not(feature = "parquet"))]
2069 panic!("Trying to use ParquetSink without `parquet` feature enabled");
2070 }
2071
2072 fn try_into_unnest_physical_plan(
2073 &self,
2074 unnest: &protobuf::UnnestExecNode,
2075 ctx: &PhysicalPlanDecodeContext<'_>,
2076 proto_converter: &dyn PhysicalProtoConverterExtension,
2077 ) -> Result<Arc<dyn ExecutionPlan>> {
2078 let input = into_physical_plan(&unnest.input, ctx, proto_converter)?;
2079
2080 Ok(Arc::new(UnnestExec::new(
2081 input,
2082 unnest
2083 .list_type_columns
2084 .iter()
2085 .map(|c| ListUnnest {
2086 index_in_input_schema: c.index_in_input_schema as _,
2087 depth: c.depth as _,
2088 })
2089 .collect(),
2090 unnest.struct_type_columns.iter().map(|c| *c as _).collect(),
2091 Arc::new(convert_required!(unnest.schema)?),
2092 into_required!(unnest.options)?,
2093 )?))
2094 }
2095
2096 fn generate_series_name_to_str(name: protobuf::GenerateSeriesName) -> &'static str {
2097 match name {
2098 protobuf::GenerateSeriesName::GsGenerateSeries => "generate_series",
2099 protobuf::GenerateSeriesName::GsRange => "range",
2100 }
2101 }
2102 fn try_into_sort_join(
2103 &self,
2104 sort_join: &SortMergeJoinExecNode,
2105 ctx: &PhysicalPlanDecodeContext<'_>,
2106 proto_converter: &dyn PhysicalProtoConverterExtension,
2107 ) -> Result<Arc<dyn ExecutionPlan>> {
2108 let left = into_physical_plan(&sort_join.left, ctx, proto_converter)?;
2109 let left_schema = left.schema();
2110 let right = into_physical_plan(&sort_join.right, ctx, proto_converter)?;
2111 let right_schema = right.schema();
2112
2113 let filter = sort_join
2114 .filter
2115 .as_ref()
2116 .map(|f| {
2117 let schema = f
2118 .schema
2119 .as_ref()
2120 .ok_or_else(|| proto_error("Missing JoinFilter schema"))?
2121 .try_into()?;
2122
2123 let expression = proto_converter.proto_to_physical_expr(
2124 f.expression.as_ref().ok_or_else(|| {
2125 proto_error("Unexpected empty filter expression")
2126 })?,
2127 &schema,
2128 ctx,
2129 )?;
2130 let column_indices = f
2131 .column_indices
2132 .iter()
2133 .map(|i| {
2134 let side =
2135 protobuf::JoinSide::try_from(i.side).map_err(|_| {
2136 proto_error(format!(
2137 "Received a SortMergeJoinExecNode message with JoinSide in Filter {}",
2138 i.side
2139 ))
2140 })?;
2141
2142 Ok(ColumnIndex {
2143 index: i.index as usize,
2144 side: side.into(),
2145 })
2146 })
2147 .collect::<Result<Vec<_>>>()?;
2148
2149 Ok(JoinFilter::new(
2150 expression,
2151 column_indices,
2152 Arc::new(schema),
2153 ))
2154 })
2155 .map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
2156
2157 let join_type =
2158 protobuf::JoinType::try_from(sort_join.join_type).map_err(|_| {
2159 proto_error(format!(
2160 "Received a SortMergeJoinExecNode message with unknown JoinType {}",
2161 sort_join.join_type
2162 ))
2163 })?;
2164
2165 let null_equality = protobuf::NullEquality::try_from(sort_join.null_equality)
2166 .map_err(|_| {
2167 proto_error(format!(
2168 "Received a SortMergeJoinExecNode message with unknown NullEquality {}",
2169 sort_join.null_equality
2170 ))
2171 })?;
2172
2173 let sort_options = sort_join
2174 .sort_options
2175 .iter()
2176 .map(|e| SortOptions {
2177 descending: !e.asc,
2178 nulls_first: e.nulls_first,
2179 })
2180 .collect();
2181 let on = sort_join
2182 .on
2183 .iter()
2184 .map(|col| {
2185 let left = proto_converter.proto_to_physical_expr(
2186 &col.left.clone().unwrap(),
2187 left_schema.as_ref(),
2188 ctx,
2189 )?;
2190 let right = proto_converter.proto_to_physical_expr(
2191 &col.right.clone().unwrap(),
2192 right_schema.as_ref(),
2193 ctx,
2194 )?;
2195 Ok((left, right))
2196 })
2197 .collect::<Result<_>>()?;
2198
2199 Ok(Arc::new(SortMergeJoinExec::try_new(
2200 left,
2201 right,
2202 on,
2203 filter,
2204 join_type.into(),
2205 sort_options,
2206 null_equality.into(),
2207 )?))
2208 }
2209
2210 fn try_into_generate_series_physical_plan(
2211 &self,
2212 generate_series: &protobuf::GenerateSeriesNode,
2213 ) -> Result<Arc<dyn ExecutionPlan>> {
2214 let schema: SchemaRef = Arc::new(convert_required!(generate_series.schema)?);
2215
2216 let args = match &generate_series.args {
2217 Some(protobuf::generate_series_node::Args::ContainsNull(args)) => {
2218 GenSeriesArgs::ContainsNull {
2219 name: Self::generate_series_name_to_str(args.name()),
2220 }
2221 }
2222 Some(protobuf::generate_series_node::Args::Int64Args(args)) => {
2223 GenSeriesArgs::Int64Args {
2224 start: args.start,
2225 end: args.end,
2226 step: args.step,
2227 include_end: args.include_end,
2228 name: Self::generate_series_name_to_str(args.name()),
2229 }
2230 }
2231 Some(protobuf::generate_series_node::Args::TimestampArgs(args)) => {
2232 let step_proto = args.step.as_ref().ok_or_else(|| {
2233 internal_datafusion_err!("Missing step in TimestampArgs")
2234 })?;
2235 let step = IntervalMonthDayNanoType::make_value(
2236 step_proto.months,
2237 step_proto.days,
2238 step_proto.nanos,
2239 );
2240 GenSeriesArgs::TimestampArgs {
2241 start: args.start,
2242 end: args.end,
2243 step,
2244 tz: args.tz.as_ref().map(|s| Arc::from(s.as_str())),
2245 include_end: args.include_end,
2246 name: Self::generate_series_name_to_str(args.name()),
2247 }
2248 }
2249 Some(protobuf::generate_series_node::Args::DateArgs(args)) => {
2250 let step_proto = args.step.as_ref().ok_or_else(|| {
2251 internal_datafusion_err!("Missing step in DateArgs")
2252 })?;
2253 let step = IntervalMonthDayNanoType::make_value(
2254 step_proto.months,
2255 step_proto.days,
2256 step_proto.nanos,
2257 );
2258 GenSeriesArgs::DateArgs {
2259 start: args.start,
2260 end: args.end,
2261 step,
2262 include_end: args.include_end,
2263 name: Self::generate_series_name_to_str(args.name()),
2264 }
2265 }
2266 None => return internal_err!("Missing args in GenerateSeriesNode"),
2267 };
2268
2269 let table = GenerateSeriesTable::new(Arc::clone(&schema), args);
2270 let generator = table.as_generator(generate_series.target_batch_size as usize)?;
2271
2272 Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?))
2273 }
2274
2275 fn try_into_cooperative_physical_plan(
2276 &self,
2277 field_stream: &protobuf::CooperativeExecNode,
2278 ctx: &PhysicalPlanDecodeContext<'_>,
2279 proto_converter: &dyn PhysicalProtoConverterExtension,
2280 ) -> Result<Arc<dyn ExecutionPlan>> {
2281 let input = into_physical_plan(&field_stream.input, ctx, proto_converter)?;
2282 Ok(Arc::new(CooperativeExec::new(input)))
2283 }
2284
2285 fn try_into_async_func_physical_plan(
2286 &self,
2287 async_func: &protobuf::AsyncFuncExecNode,
2288 ctx: &PhysicalPlanDecodeContext<'_>,
2289 proto_converter: &dyn PhysicalProtoConverterExtension,
2290 ) -> Result<Arc<dyn ExecutionPlan>> {
2291 let input: Arc<dyn ExecutionPlan> =
2292 into_physical_plan(&async_func.input, ctx, proto_converter)?;
2293
2294 if async_func.async_exprs.len() != async_func.async_expr_names.len() {
2295 return internal_err!(
2296 "AsyncFuncExecNode async_exprs length does not match async_expr_names"
2297 );
2298 }
2299
2300 let async_exprs = async_func
2301 .async_exprs
2302 .iter()
2303 .zip(async_func.async_expr_names.iter())
2304 .map(|(expr, name)| {
2305 let physical_expr = proto_converter.proto_to_physical_expr(
2306 expr,
2307 input.schema().as_ref(),
2308 ctx,
2309 )?;
2310
2311 Ok(Arc::new(AsyncFuncExpr::try_new(
2312 name.clone(),
2313 physical_expr,
2314 input.schema().as_ref(),
2315 )?))
2316 })
2317 .collect::<Result<Vec<_>>>()?;
2318
2319 Ok(Arc::new(AsyncFuncExec::try_new(async_exprs, input)?))
2320 }
2321
2322 fn try_into_buffer_physical_plan(
2323 &self,
2324 buffer: &protobuf::BufferExecNode,
2325 ctx: &PhysicalPlanDecodeContext<'_>,
2326 proto_converter: &dyn PhysicalProtoConverterExtension,
2327 ) -> Result<Arc<dyn ExecutionPlan>> {
2328 let input: Arc<dyn ExecutionPlan> =
2329 into_physical_plan(&buffer.input, ctx, proto_converter)?;
2330
2331 Ok(Arc::new(BufferExec::new(input, buffer.capacity as usize)))
2332 }
2333
2334 fn try_into_scalar_subquery_physical_plan(
2335 &self,
2336 sq: &protobuf::ScalarSubqueryExecNode,
2337 ctx: &PhysicalPlanDecodeContext<'_>,
2338 proto_converter: &dyn PhysicalProtoConverterExtension,
2339 ) -> Result<Arc<dyn ExecutionPlan>> {
2340 let subquery_results = ScalarSubqueryResults::new(sq.subqueries.len());
2343 let input_ctx = ctx.with_scalar_subquery_results(subquery_results.clone());
2344 let input = into_physical_plan(&sq.input, &input_ctx, proto_converter)?;
2345
2346 let subqueries: Vec<ScalarSubqueryLink> = sq
2348 .subqueries
2349 .iter()
2350 .enumerate()
2351 .map(|(index, sq_plan)| {
2352 let plan =
2353 sq_plan.try_into_physical_plan_with_context(ctx, proto_converter)?;
2354 Ok(ScalarSubqueryLink {
2355 plan,
2356 index: SubqueryIndex::new(index),
2357 })
2358 })
2359 .collect::<Result<Vec<_>>>()?;
2360
2361 Ok(Arc::new(ScalarSubqueryExec::new(
2362 input,
2363 subqueries,
2364 subquery_results,
2365 )))
2366 }
2367
2368 fn try_from_explain_exec(
2369 exec: &ExplainExec,
2370 _codec: &dyn PhysicalExtensionCodec,
2371 ) -> Result<Self> {
2372 Ok(protobuf::PhysicalPlanNode {
2373 physical_plan_type: Some(PhysicalPlanType::Explain(
2374 protobuf::ExplainExecNode {
2375 schema: Some(exec.schema().as_ref().try_into()?),
2376 stringified_plans: exec
2377 .stringified_plans()
2378 .iter()
2379 .map(|plan| plan.into())
2380 .collect(),
2381 verbose: exec.verbose(),
2382 },
2383 )),
2384 })
2385 }
2386
2387 fn try_from_projection_exec(
2388 exec: &ProjectionExec,
2389 codec: &dyn PhysicalExtensionCodec,
2390 proto_converter: &dyn PhysicalProtoConverterExtension,
2391 ) -> Result<Self> {
2392 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2393 exec.input().to_owned(),
2394 codec,
2395 proto_converter,
2396 )?;
2397 let expr = exec
2398 .expr()
2399 .iter()
2400 .map(|proj_expr| {
2401 proto_converter.physical_expr_to_proto(&proj_expr.expr, codec)
2402 })
2403 .collect::<Result<Vec<_>>>()?;
2404 let expr_name = exec
2405 .expr()
2406 .iter()
2407 .map(|proj_expr| proj_expr.alias.clone())
2408 .collect();
2409 Ok(protobuf::PhysicalPlanNode {
2410 physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
2411 protobuf::ProjectionExecNode {
2412 input: Some(Box::new(input)),
2413 expr,
2414 expr_name,
2415 },
2416 ))),
2417 })
2418 }
2419
2420 fn try_from_analyze_exec(
2421 exec: &AnalyzeExec,
2422 codec: &dyn PhysicalExtensionCodec,
2423 proto_converter: &dyn PhysicalProtoConverterExtension,
2424 ) -> Result<Self> {
2425 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2426 exec.input().to_owned(),
2427 codec,
2428 proto_converter,
2429 )?;
2430 let (has_metric_categories, metric_categories) = match exec.metric_categories() {
2431 Some(cats) => (true, cats.iter().map(|c| c.to_string()).collect()),
2432 None => (false, vec![]),
2433 };
2434 Ok(protobuf::PhysicalPlanNode {
2435 physical_plan_type: Some(PhysicalPlanType::Analyze(Box::new(
2436 protobuf::AnalyzeExecNode {
2437 verbose: exec.verbose(),
2438 show_statistics: exec.show_statistics(),
2439 input: Some(Box::new(input)),
2440 schema: Some(exec.schema().as_ref().try_into()?),
2441 has_metric_categories,
2442 metric_categories,
2443 },
2444 ))),
2445 })
2446 }
2447
2448 fn try_from_filter_exec(
2449 exec: &FilterExec,
2450 codec: &dyn PhysicalExtensionCodec,
2451 proto_converter: &dyn PhysicalProtoConverterExtension,
2452 ) -> Result<Self> {
2453 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2454 exec.input().to_owned(),
2455 codec,
2456 proto_converter,
2457 )?;
2458 Ok(protobuf::PhysicalPlanNode {
2459 physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
2460 protobuf::FilterExecNode {
2461 input: Some(Box::new(input)),
2462 expr: Some(
2463 proto_converter
2464 .physical_expr_to_proto(exec.predicate(), codec)?,
2465 ),
2466 default_filter_selectivity: exec.default_selectivity() as u32,
2467 projection: match exec.projection() {
2468 None => (0..exec.input().schema().fields().len())
2469 .map(|i| i as u32)
2470 .collect(),
2471 Some(v) => v.iter().map(|x| *x as u32).collect(),
2472 },
2473 batch_size: exec.batch_size() as u32,
2474 fetch: exec.fetch().map(|f| f as u32),
2475 },
2476 ))),
2477 })
2478 }
2479
2480 fn try_from_global_limit_exec(
2481 limit: &GlobalLimitExec,
2482 codec: &dyn PhysicalExtensionCodec,
2483 proto_converter: &dyn PhysicalProtoConverterExtension,
2484 ) -> Result<Self> {
2485 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2486 limit.input().to_owned(),
2487 codec,
2488 proto_converter,
2489 )?;
2490
2491 Ok(protobuf::PhysicalPlanNode {
2492 physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
2493 protobuf::GlobalLimitExecNode {
2494 input: Some(Box::new(input)),
2495 skip: limit.skip() as u32,
2496 fetch: match limit.fetch() {
2497 Some(n) => n as i64,
2498 _ => -1, },
2500 },
2501 ))),
2502 })
2503 }
2504
2505 fn try_from_local_limit_exec(
2506 limit: &LocalLimitExec,
2507 codec: &dyn PhysicalExtensionCodec,
2508 proto_converter: &dyn PhysicalProtoConverterExtension,
2509 ) -> Result<Self> {
2510 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2511 limit.input().to_owned(),
2512 codec,
2513 proto_converter,
2514 )?;
2515 Ok(protobuf::PhysicalPlanNode {
2516 physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
2517 protobuf::LocalLimitExecNode {
2518 input: Some(Box::new(input)),
2519 fetch: limit.fetch() as u32,
2520 },
2521 ))),
2522 })
2523 }
2524
2525 fn try_from_hash_join_exec(
2526 exec: &HashJoinExec,
2527 codec: &dyn PhysicalExtensionCodec,
2528 proto_converter: &dyn PhysicalProtoConverterExtension,
2529 ) -> Result<Self> {
2530 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2531 exec.left().to_owned(),
2532 codec,
2533 proto_converter,
2534 )?;
2535 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2536 exec.right().to_owned(),
2537 codec,
2538 proto_converter,
2539 )?;
2540 let on: Vec<protobuf::JoinOn> = exec
2541 .on()
2542 .iter()
2543 .map(|tuple| {
2544 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2545 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2546 Ok::<_, DataFusionError>(protobuf::JoinOn {
2547 left: Some(l),
2548 right: Some(r),
2549 })
2550 })
2551 .collect::<Result<_>>()?;
2552 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2553 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2554 let filter = exec
2555 .filter()
2556 .as_ref()
2557 .map(|f| {
2558 let expression =
2559 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2560 let column_indices = f
2561 .column_indices()
2562 .iter()
2563 .map(|i| {
2564 let side: protobuf::JoinSide = i.side.to_owned().into();
2565 protobuf::ColumnIndex {
2566 index: i.index as u32,
2567 side: side.into(),
2568 }
2569 })
2570 .collect();
2571 let schema = f.schema().as_ref().try_into()?;
2572 Ok(protobuf::JoinFilter {
2573 expression: Some(expression),
2574 column_indices,
2575 schema: Some(schema),
2576 })
2577 })
2578 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2579
2580 let partition_mode = match exec.partition_mode() {
2581 PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
2582 PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
2583 PartitionMode::Auto => protobuf::PartitionMode::Auto,
2584 };
2585
2586 let dynamic_filter = exec
2587 .dynamic_filter_expr()
2588 .map(|df| {
2589 let df_expr: Arc<dyn PhysicalExpr> =
2590 Arc::clone(df) as Arc<dyn PhysicalExpr>;
2591 proto_converter.physical_expr_to_proto(&df_expr, codec)
2592 })
2593 .transpose()?;
2594
2595 Ok(protobuf::PhysicalPlanNode {
2596 physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
2597 protobuf::HashJoinExecNode {
2598 left: Some(Box::new(left)),
2599 right: Some(Box::new(right)),
2600 on,
2601 join_type: join_type.into(),
2602 partition_mode: partition_mode.into(),
2603 null_equality: null_equality.into(),
2604 filter,
2605 projection: exec.projection.as_ref().map_or_else(Vec::new, |v| {
2606 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
2607 }),
2608 null_aware: exec.null_aware,
2609 dynamic_filter,
2610 },
2611 ))),
2612 })
2613 }
2614
2615 fn try_from_symmetric_hash_join_exec(
2616 exec: &SymmetricHashJoinExec,
2617 codec: &dyn PhysicalExtensionCodec,
2618 proto_converter: &dyn PhysicalProtoConverterExtension,
2619 ) -> Result<Self> {
2620 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2621 exec.left().to_owned(),
2622 codec,
2623 proto_converter,
2624 )?;
2625 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2626 exec.right().to_owned(),
2627 codec,
2628 proto_converter,
2629 )?;
2630 let on = exec
2631 .on()
2632 .iter()
2633 .map(|tuple| {
2634 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2635 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2636 Ok::<_, DataFusionError>(protobuf::JoinOn {
2637 left: Some(l),
2638 right: Some(r),
2639 })
2640 })
2641 .collect::<Result<_>>()?;
2642 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2643 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2644 let filter = exec
2645 .filter()
2646 .as_ref()
2647 .map(|f| {
2648 let expression =
2649 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2650 let column_indices = f
2651 .column_indices()
2652 .iter()
2653 .map(|i| {
2654 let side: protobuf::JoinSide = i.side.to_owned().into();
2655 protobuf::ColumnIndex {
2656 index: i.index as u32,
2657 side: side.into(),
2658 }
2659 })
2660 .collect();
2661 let schema = f.schema().as_ref().try_into()?;
2662 Ok(protobuf::JoinFilter {
2663 expression: Some(expression),
2664 column_indices,
2665 schema: Some(schema),
2666 })
2667 })
2668 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2669
2670 let partition_mode = match exec.partition_mode() {
2671 StreamJoinPartitionMode::SinglePartition => {
2672 protobuf::StreamPartitionMode::SinglePartition
2673 }
2674 StreamJoinPartitionMode::Partitioned => {
2675 protobuf::StreamPartitionMode::PartitionedExec
2676 }
2677 };
2678
2679 let left_sort_exprs = exec
2680 .left_sort_exprs()
2681 .map(|exprs| {
2682 exprs
2683 .iter()
2684 .map(|expr| {
2685 Ok(protobuf::PhysicalSortExprNode {
2686 expr: Some(Box::new(
2687 proto_converter
2688 .physical_expr_to_proto(&expr.expr, codec)?,
2689 )),
2690 asc: !expr.options.descending,
2691 nulls_first: expr.options.nulls_first,
2692 })
2693 })
2694 .collect::<Result<Vec<_>>>()
2695 })
2696 .transpose()?
2697 .unwrap_or(vec![]);
2698
2699 let right_sort_exprs = exec
2700 .right_sort_exprs()
2701 .map(|exprs| {
2702 exprs
2703 .iter()
2704 .map(|expr| {
2705 Ok(protobuf::PhysicalSortExprNode {
2706 expr: Some(Box::new(
2707 proto_converter
2708 .physical_expr_to_proto(&expr.expr, codec)?,
2709 )),
2710 asc: !expr.options.descending,
2711 nulls_first: expr.options.nulls_first,
2712 })
2713 })
2714 .collect::<Result<Vec<_>>>()
2715 })
2716 .transpose()?
2717 .unwrap_or(vec![]);
2718
2719 Ok(protobuf::PhysicalPlanNode {
2720 physical_plan_type: Some(PhysicalPlanType::SymmetricHashJoin(Box::new(
2721 protobuf::SymmetricHashJoinExecNode {
2722 left: Some(Box::new(left)),
2723 right: Some(Box::new(right)),
2724 on,
2725 join_type: join_type.into(),
2726 partition_mode: partition_mode.into(),
2727 null_equality: null_equality.into(),
2728 left_sort_exprs,
2729 right_sort_exprs,
2730 filter,
2731 },
2732 ))),
2733 })
2734 }
2735
2736 fn try_from_sort_merge_join_exec(
2737 exec: &SortMergeJoinExec,
2738 codec: &dyn PhysicalExtensionCodec,
2739 proto_converter: &dyn PhysicalProtoConverterExtension,
2740 ) -> Result<Self> {
2741 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2742 exec.left().to_owned(),
2743 codec,
2744 proto_converter,
2745 )?;
2746 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2747 exec.right().to_owned(),
2748 codec,
2749 proto_converter,
2750 )?;
2751 let on = exec
2752 .on()
2753 .iter()
2754 .map(|tuple| {
2755 let l = proto_converter.physical_expr_to_proto(&tuple.0, codec)?;
2756 let r = proto_converter.physical_expr_to_proto(&tuple.1, codec)?;
2757 Ok::<_, DataFusionError>(protobuf::JoinOn {
2758 left: Some(l),
2759 right: Some(r),
2760 })
2761 })
2762 .collect::<Result<_>>()?;
2763 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
2764 let null_equality: protobuf::NullEquality = exec.null_equality().into();
2765 let filter = exec
2766 .filter()
2767 .as_ref()
2768 .map(|f| {
2769 let expression =
2770 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
2771 let column_indices = f
2772 .column_indices()
2773 .iter()
2774 .map(|i| {
2775 let side: protobuf::JoinSide = i.side.to_owned().into();
2776 protobuf::ColumnIndex {
2777 index: i.index as u32,
2778 side: side.into(),
2779 }
2780 })
2781 .collect();
2782 let schema = f.schema().as_ref().try_into()?;
2783 Ok(protobuf::JoinFilter {
2784 expression: Some(expression),
2785 column_indices,
2786 schema: Some(schema),
2787 })
2788 })
2789 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
2790
2791 let sort_options = exec
2792 .sort_options()
2793 .iter()
2794 .map(
2795 |SortOptions {
2796 descending,
2797 nulls_first,
2798 }| {
2799 SortExprNode {
2800 expr: None,
2801 asc: !*descending,
2802 nulls_first: *nulls_first,
2803 }
2804 },
2805 )
2806 .collect();
2807
2808 Ok(protobuf::PhysicalPlanNode {
2809 physical_plan_type: Some(PhysicalPlanType::SortMergeJoin(Box::new(
2810 SortMergeJoinExecNode {
2811 left: Some(Box::new(left)),
2812 right: Some(Box::new(right)),
2813 on,
2814 join_type: join_type.into(),
2815 null_equality: null_equality.into(),
2816 filter,
2817 sort_options,
2818 },
2819 ))),
2820 })
2821 }
2822
2823 fn try_from_cross_join_exec(
2824 exec: &CrossJoinExec,
2825 codec: &dyn PhysicalExtensionCodec,
2826 proto_converter: &dyn PhysicalProtoConverterExtension,
2827 ) -> Result<Self> {
2828 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2829 exec.left().to_owned(),
2830 codec,
2831 proto_converter,
2832 )?;
2833 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2834 exec.right().to_owned(),
2835 codec,
2836 proto_converter,
2837 )?;
2838 Ok(protobuf::PhysicalPlanNode {
2839 physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
2840 protobuf::CrossJoinExecNode {
2841 left: Some(Box::new(left)),
2842 right: Some(Box::new(right)),
2843 },
2844 ))),
2845 })
2846 }
2847
2848 fn try_from_aggregate_exec(
2849 exec: &AggregateExec,
2850 codec: &dyn PhysicalExtensionCodec,
2851 proto_converter: &dyn PhysicalProtoConverterExtension,
2852 ) -> Result<Self> {
2853 let groups: Vec<bool> = exec
2854 .group_expr()
2855 .groups()
2856 .iter()
2857 .flatten()
2858 .copied()
2859 .collect();
2860
2861 let group_names = exec
2862 .group_expr()
2863 .expr()
2864 .iter()
2865 .map(|expr| expr.1.to_owned())
2866 .collect();
2867
2868 let filter = exec
2869 .filter_expr()
2870 .iter()
2871 .map(|expr| serialize_maybe_filter(expr.to_owned(), codec, proto_converter))
2872 .collect::<Result<Vec<_>>>()?;
2873
2874 let agg = exec
2875 .aggr_expr()
2876 .iter()
2877 .map(|expr| {
2878 serialize_physical_aggr_expr(expr.to_owned(), codec, proto_converter)
2879 })
2880 .collect::<Result<Vec<_>>>()?;
2881
2882 let agg_names = exec
2883 .aggr_expr()
2884 .iter()
2885 .map(|expr| expr.name().to_string())
2886 .collect::<Vec<_>>();
2887
2888 let agg_mode = match exec.mode() {
2889 AggregateMode::Partial => protobuf::AggregateMode::Partial,
2890 AggregateMode::Final => protobuf::AggregateMode::Final,
2891 AggregateMode::FinalPartitioned => protobuf::AggregateMode::FinalPartitioned,
2892 AggregateMode::Single => protobuf::AggregateMode::Single,
2893 AggregateMode::SinglePartitioned => {
2894 protobuf::AggregateMode::SinglePartitioned
2895 }
2896 AggregateMode::PartialReduce => protobuf::AggregateMode::PartialReduce,
2897 };
2898 let input_schema = exec.input_schema();
2899 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2900 exec.input().to_owned(),
2901 codec,
2902 proto_converter,
2903 )?;
2904
2905 let null_expr = exec
2906 .group_expr()
2907 .null_expr()
2908 .iter()
2909 .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2910 .collect::<Result<Vec<_>>>()?;
2911
2912 let group_expr = exec
2913 .group_expr()
2914 .expr()
2915 .iter()
2916 .map(|expr| proto_converter.physical_expr_to_proto(&expr.0, codec))
2917 .collect::<Result<Vec<_>>>()?;
2918
2919 let limit = exec.limit_options().map(|config| protobuf::AggLimit {
2920 limit: config.limit() as u64,
2921 descending: config.descending(),
2922 });
2923
2924 Ok(protobuf::PhysicalPlanNode {
2925 physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
2926 protobuf::AggregateExecNode {
2927 group_expr,
2928 group_expr_name: group_names,
2929 aggr_expr: agg,
2930 filter_expr: filter,
2931 aggr_expr_name: agg_names,
2932 mode: agg_mode as i32,
2933 input: Some(Box::new(input)),
2934 input_schema: Some(input_schema.as_ref().try_into()?),
2935 null_expr,
2936 groups,
2937 limit,
2938 has_grouping_set: exec.group_expr().has_grouping_set(),
2939 dynamic_filter: exec
2940 .dynamic_filter_expr()
2941 .map(|df| {
2942 let df_expr: Arc<dyn PhysicalExpr> =
2943 Arc::clone(df) as Arc<dyn PhysicalExpr>;
2944 proto_converter.physical_expr_to_proto(&df_expr, codec)
2945 })
2946 .transpose()?,
2947 },
2948 ))),
2949 })
2950 }
2951
2952 fn try_from_empty_exec(
2953 empty: &EmptyExec,
2954 _codec: &dyn PhysicalExtensionCodec,
2955 ) -> Result<Self> {
2956 let schema = empty.schema().as_ref().try_into()?;
2957 Ok(protobuf::PhysicalPlanNode {
2958 physical_plan_type: Some(PhysicalPlanType::Empty(protobuf::EmptyExecNode {
2959 schema: Some(schema),
2960 })),
2961 })
2962 }
2963
2964 fn try_from_placeholder_row_exec(
2965 empty: &PlaceholderRowExec,
2966 _codec: &dyn PhysicalExtensionCodec,
2967 ) -> Result<Self> {
2968 let schema = empty.schema().as_ref().try_into()?;
2969 Ok(protobuf::PhysicalPlanNode {
2970 physical_plan_type: Some(PhysicalPlanType::PlaceholderRow(
2971 protobuf::PlaceholderRowExecNode {
2972 schema: Some(schema),
2973 },
2974 )),
2975 })
2976 }
2977
2978 #[expect(deprecated)]
2979 fn try_from_coalesce_batches_exec(
2980 coalesce_batches: &CoalesceBatchesExec,
2981 codec: &dyn PhysicalExtensionCodec,
2982 proto_converter: &dyn PhysicalProtoConverterExtension,
2983 ) -> Result<Self> {
2984 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
2985 coalesce_batches.input().to_owned(),
2986 codec,
2987 proto_converter,
2988 )?;
2989 Ok(protobuf::PhysicalPlanNode {
2990 physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
2991 protobuf::CoalesceBatchesExecNode {
2992 input: Some(Box::new(input)),
2993 target_batch_size: coalesce_batches.target_batch_size() as u32,
2994 fetch: coalesce_batches.fetch().map(|n| n as u32),
2995 },
2996 ))),
2997 })
2998 }
2999
3000 fn try_from_data_source_exec(
3001 data_source_exec: &DataSourceExec,
3002 codec: &dyn PhysicalExtensionCodec,
3003 proto_converter: &dyn PhysicalProtoConverterExtension,
3004 ) -> Result<Option<Self>> {
3005 let data_source = data_source_exec.data_source();
3006 if let Some(maybe_csv) = data_source.downcast_ref::<FileScanConfig>() {
3007 let source = maybe_csv.file_source();
3008 if let Some(csv_config) = source.downcast_ref::<CsvSource>() {
3009 return Ok(Some(protobuf::PhysicalPlanNode {
3010 physical_plan_type: Some(PhysicalPlanType::CsvScan(
3011 protobuf::CsvScanExecNode {
3012 base_conf: Some(serialize_file_scan_config(
3013 maybe_csv,
3014 codec,
3015 proto_converter,
3016 )?),
3017 has_header: csv_config.has_header(),
3018 delimiter: byte_to_string(
3019 csv_config.delimiter(),
3020 "delimiter",
3021 )?,
3022 quote: byte_to_string(csv_config.quote(), "quote")?,
3023 optional_escape: if let Some(escape) = csv_config.escape() {
3024 Some(
3025 protobuf::csv_scan_exec_node::OptionalEscape::Escape(
3026 byte_to_string(escape, "escape")?,
3027 ),
3028 )
3029 } else {
3030 None
3031 },
3032 optional_comment: if let Some(comment) = csv_config.comment()
3033 {
3034 Some(protobuf::csv_scan_exec_node::OptionalComment::Comment(
3035 byte_to_string(comment, "comment")?,
3036 ))
3037 } else {
3038 None
3039 },
3040 newlines_in_values: csv_config.newlines_in_values(),
3041 truncate_rows: csv_config.truncate_rows(),
3042 },
3043 )),
3044 }));
3045 }
3046 }
3047
3048 if let Some(scan_conf) = data_source.downcast_ref::<FileScanConfig>() {
3049 let source = scan_conf.file_source();
3050 if let Some(_json_source) = source.downcast_ref::<JsonSource>() {
3051 return Ok(Some(protobuf::PhysicalPlanNode {
3052 physical_plan_type: Some(PhysicalPlanType::JsonScan(
3053 protobuf::JsonScanExecNode {
3054 base_conf: Some(serialize_file_scan_config(
3055 scan_conf,
3056 codec,
3057 proto_converter,
3058 )?),
3059 },
3060 )),
3061 }));
3062 }
3063 }
3064
3065 if let Some(scan_conf) = data_source.downcast_ref::<FileScanConfig>() {
3066 let source = scan_conf.file_source();
3067 if let Some(_arrow_source) = source.downcast_ref::<ArrowSource>() {
3068 return Ok(Some(protobuf::PhysicalPlanNode {
3069 physical_plan_type: Some(PhysicalPlanType::ArrowScan(
3070 protobuf::ArrowScanExecNode {
3071 base_conf: Some(serialize_file_scan_config(
3072 scan_conf,
3073 codec,
3074 proto_converter,
3075 )?),
3076 },
3077 )),
3078 }));
3079 }
3080 }
3081
3082 #[cfg(feature = "parquet")]
3083 if let Some((maybe_parquet, conf)) =
3084 data_source_exec.downcast_to_file_source::<ParquetSource>()
3085 {
3086 let predicate = conf
3087 .filter()
3088 .map(|pred| proto_converter.physical_expr_to_proto(&pred, codec))
3089 .transpose()?;
3090 return Ok(Some(protobuf::PhysicalPlanNode {
3091 physical_plan_type: Some(PhysicalPlanType::ParquetScan(
3092 protobuf::ParquetScanExecNode {
3093 base_conf: Some(serialize_file_scan_config(
3094 maybe_parquet,
3095 codec,
3096 proto_converter,
3097 )?),
3098 predicate,
3099 parquet_options: Some(conf.table_parquet_options().try_into()?),
3100 },
3101 )),
3102 }));
3103 }
3104
3105 #[cfg(feature = "avro")]
3106 if let Some(maybe_avro) = data_source.downcast_ref::<FileScanConfig>() {
3107 let source = maybe_avro.file_source();
3108 if source.downcast_ref::<AvroSource>().is_some() {
3109 return Ok(Some(protobuf::PhysicalPlanNode {
3110 physical_plan_type: Some(PhysicalPlanType::AvroScan(
3111 protobuf::AvroScanExecNode {
3112 base_conf: Some(serialize_file_scan_config(
3113 maybe_avro,
3114 codec,
3115 proto_converter,
3116 )?),
3117 },
3118 )),
3119 }));
3120 }
3121 }
3122
3123 if let Some(source_conf) = data_source.downcast_ref::<MemorySourceConfig>() {
3124 let proto_partitions = source_conf
3125 .partitions()
3126 .iter()
3127 .map(|p| serialize_record_batches(p))
3128 .collect::<Result<Vec<_>>>()?;
3129
3130 let proto_schema: protobuf::Schema =
3131 source_conf.original_schema().as_ref().try_into()?;
3132
3133 let proto_projection = source_conf
3134 .projection()
3135 .as_ref()
3136 .map_or_else(Vec::new, |v| {
3137 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3138 });
3139
3140 let proto_sort_information = source_conf
3141 .sort_information()
3142 .iter()
3143 .map(|ordering| {
3144 let sort_exprs = serialize_physical_sort_exprs(
3145 ordering.to_owned(),
3146 codec,
3147 proto_converter,
3148 )?;
3149 Ok::<_, DataFusionError>(protobuf::PhysicalSortExprNodeCollection {
3150 physical_sort_expr_nodes: sort_exprs,
3151 })
3152 })
3153 .collect::<Result<Vec<_>, _>>()?;
3154
3155 return Ok(Some(protobuf::PhysicalPlanNode {
3156 physical_plan_type: Some(PhysicalPlanType::MemoryScan(
3157 protobuf::MemoryScanExecNode {
3158 partitions: proto_partitions,
3159 schema: Some(proto_schema),
3160 projection: proto_projection,
3161 sort_information: proto_sort_information,
3162 show_sizes: source_conf.show_sizes(),
3163 fetch: source_conf.fetch().map(|f| f as u32),
3164 },
3165 )),
3166 }));
3167 }
3168
3169 Ok(None)
3170 }
3171
3172 fn try_from_coalesce_partitions_exec(
3173 exec: &CoalescePartitionsExec,
3174 codec: &dyn PhysicalExtensionCodec,
3175 proto_converter: &dyn PhysicalProtoConverterExtension,
3176 ) -> Result<Self> {
3177 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3178 exec.input().to_owned(),
3179 codec,
3180 proto_converter,
3181 )?;
3182 Ok(protobuf::PhysicalPlanNode {
3183 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
3184 protobuf::CoalescePartitionsExecNode {
3185 input: Some(Box::new(input)),
3186 fetch: exec.fetch().map(|f| f as u32),
3187 },
3188 ))),
3189 })
3190 }
3191
3192 fn try_from_repartition_exec(
3193 exec: &RepartitionExec,
3194 codec: &dyn PhysicalExtensionCodec,
3195 proto_converter: &dyn PhysicalProtoConverterExtension,
3196 ) -> Result<Self> {
3197 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3198 exec.input().to_owned(),
3199 codec,
3200 proto_converter,
3201 )?;
3202
3203 let pb_partitioning =
3204 serialize_partitioning(exec.partitioning(), codec, proto_converter)?;
3205
3206 Ok(protobuf::PhysicalPlanNode {
3207 physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
3208 protobuf::RepartitionExecNode {
3209 input: Some(Box::new(input)),
3210 partitioning: Some(pb_partitioning),
3211 preserve_order: exec.preserve_order(),
3212 },
3213 ))),
3214 })
3215 }
3216
3217 fn try_from_sort_exec(
3218 exec: &SortExec,
3219 codec: &dyn PhysicalExtensionCodec,
3220 proto_converter: &dyn PhysicalProtoConverterExtension,
3221 ) -> Result<Self> {
3222 let input = proto_converter.execution_plan_to_proto(exec.input(), codec)?;
3223 let expr = exec
3224 .expr()
3225 .iter()
3226 .map(|expr| {
3227 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3228 expr: Some(Box::new(
3229 proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3230 )),
3231 asc: !expr.options.descending,
3232 nulls_first: expr.options.nulls_first,
3233 });
3234 Ok(protobuf::PhysicalExprNode {
3235 expr_id: None,
3236 expr_type: Some(ExprType::Sort(sort_expr)),
3237 })
3238 })
3239 .collect::<Result<Vec<_>>>()?;
3240 let dynamic_filter = exec
3241 .dynamic_filter_expr()
3242 .map(|df| {
3243 let df_expr: Arc<dyn PhysicalExpr> = df as Arc<dyn PhysicalExpr>;
3244 proto_converter.physical_expr_to_proto(&df_expr, codec)
3245 })
3246 .transpose()?;
3247
3248 Ok(protobuf::PhysicalPlanNode {
3249 physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
3250 protobuf::SortExecNode {
3251 input: Some(Box::new(input)),
3252 expr,
3253 fetch: match exec.fetch() {
3254 Some(n) => n as i64,
3255 _ => -1,
3256 },
3257 preserve_partitioning: exec.preserve_partitioning(),
3258 dynamic_filter,
3259 },
3260 ))),
3261 })
3262 }
3263
3264 fn try_from_union_exec(
3265 union: &UnionExec,
3266 codec: &dyn PhysicalExtensionCodec,
3267 proto_converter: &dyn PhysicalProtoConverterExtension,
3268 ) -> Result<Self> {
3269 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3270 for input in union.inputs() {
3271 inputs.push(
3272 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3273 input.to_owned(),
3274 codec,
3275 proto_converter,
3276 )?,
3277 );
3278 }
3279 Ok(protobuf::PhysicalPlanNode {
3280 physical_plan_type: Some(PhysicalPlanType::Union(protobuf::UnionExecNode {
3281 inputs,
3282 })),
3283 })
3284 }
3285
3286 fn try_from_interleave_exec(
3287 interleave: &InterleaveExec,
3288 codec: &dyn PhysicalExtensionCodec,
3289 proto_converter: &dyn PhysicalProtoConverterExtension,
3290 ) -> Result<Self> {
3291 let mut inputs: Vec<protobuf::PhysicalPlanNode> = vec![];
3292 for input in interleave.inputs() {
3293 inputs.push(
3294 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3295 input.to_owned(),
3296 codec,
3297 proto_converter,
3298 )?,
3299 );
3300 }
3301 Ok(protobuf::PhysicalPlanNode {
3302 physical_plan_type: Some(PhysicalPlanType::Interleave(
3303 protobuf::InterleaveExecNode { inputs },
3304 )),
3305 })
3306 }
3307
3308 fn try_from_sort_preserving_merge_exec(
3309 exec: &SortPreservingMergeExec,
3310 codec: &dyn PhysicalExtensionCodec,
3311 proto_converter: &dyn PhysicalProtoConverterExtension,
3312 ) -> Result<Self> {
3313 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3314 exec.input().to_owned(),
3315 codec,
3316 proto_converter,
3317 )?;
3318 let expr = exec
3319 .expr()
3320 .iter()
3321 .map(|expr| {
3322 let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
3323 expr: Some(Box::new(
3324 proto_converter.physical_expr_to_proto(&expr.expr, codec)?,
3325 )),
3326 asc: !expr.options.descending,
3327 nulls_first: expr.options.nulls_first,
3328 });
3329 Ok(protobuf::PhysicalExprNode {
3330 expr_id: None,
3331 expr_type: Some(ExprType::Sort(sort_expr)),
3332 })
3333 })
3334 .collect::<Result<Vec<_>>>()?;
3335 Ok(protobuf::PhysicalPlanNode {
3336 physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(Box::new(
3337 protobuf::SortPreservingMergeExecNode {
3338 input: Some(Box::new(input)),
3339 expr,
3340 fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
3341 },
3342 ))),
3343 })
3344 }
3345
3346 fn try_from_nested_loop_join_exec(
3347 exec: &NestedLoopJoinExec,
3348 codec: &dyn PhysicalExtensionCodec,
3349 proto_converter: &dyn PhysicalProtoConverterExtension,
3350 ) -> Result<Self> {
3351 let left = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3352 exec.left().to_owned(),
3353 codec,
3354 proto_converter,
3355 )?;
3356 let right = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3357 exec.right().to_owned(),
3358 codec,
3359 proto_converter,
3360 )?;
3361
3362 let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
3363 let filter = exec
3364 .filter()
3365 .as_ref()
3366 .map(|f| {
3367 let expression =
3368 proto_converter.physical_expr_to_proto(f.expression(), codec)?;
3369 let column_indices = f
3370 .column_indices()
3371 .iter()
3372 .map(|i| {
3373 let side: protobuf::JoinSide = i.side.to_owned().into();
3374 protobuf::ColumnIndex {
3375 index: i.index as u32,
3376 side: side.into(),
3377 }
3378 })
3379 .collect();
3380 let schema = f.schema().as_ref().try_into()?;
3381 Ok(protobuf::JoinFilter {
3382 expression: Some(expression),
3383 column_indices,
3384 schema: Some(schema),
3385 })
3386 })
3387 .map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
3388
3389 Ok(protobuf::PhysicalPlanNode {
3390 physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
3391 protobuf::NestedLoopJoinExecNode {
3392 left: Some(Box::new(left)),
3393 right: Some(Box::new(right)),
3394 join_type: join_type.into(),
3395 filter,
3396 projection: exec.projection().as_ref().map_or_else(Vec::new, |v| {
3397 v.iter().map(|x| *x as u32).collect::<Vec<u32>>()
3398 }),
3399 },
3400 ))),
3401 })
3402 }
3403
3404 fn try_from_window_agg_exec(
3405 exec: &WindowAggExec,
3406 codec: &dyn PhysicalExtensionCodec,
3407 proto_converter: &dyn PhysicalProtoConverterExtension,
3408 ) -> Result<Self> {
3409 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3410 exec.input().to_owned(),
3411 codec,
3412 proto_converter,
3413 )?;
3414
3415 let window_expr = exec
3416 .window_expr()
3417 .iter()
3418 .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3419 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3420
3421 let partition_keys = exec
3422 .partition_keys()
3423 .iter()
3424 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3425 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3426
3427 Ok(protobuf::PhysicalPlanNode {
3428 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3429 protobuf::WindowAggExecNode {
3430 input: Some(Box::new(input)),
3431 window_expr,
3432 partition_keys,
3433 input_order_mode: None,
3434 },
3435 ))),
3436 })
3437 }
3438
3439 fn try_from_bounded_window_agg_exec(
3440 exec: &BoundedWindowAggExec,
3441 codec: &dyn PhysicalExtensionCodec,
3442 proto_converter: &dyn PhysicalProtoConverterExtension,
3443 ) -> Result<Self> {
3444 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3445 exec.input().to_owned(),
3446 codec,
3447 proto_converter,
3448 )?;
3449
3450 let window_expr = exec
3451 .window_expr()
3452 .iter()
3453 .map(|e| serialize_physical_window_expr(e, codec, proto_converter))
3454 .collect::<Result<Vec<protobuf::PhysicalWindowExprNode>>>()?;
3455
3456 let partition_keys = exec
3457 .partition_keys()
3458 .iter()
3459 .map(|e| proto_converter.physical_expr_to_proto(e, codec))
3460 .collect::<Result<Vec<protobuf::PhysicalExprNode>>>()?;
3461
3462 let input_order_mode = match &exec.input_order_mode {
3463 InputOrderMode::Linear => {
3464 window_agg_exec_node::InputOrderMode::Linear(protobuf::EmptyMessage {})
3465 }
3466 InputOrderMode::PartiallySorted(columns) => {
3467 window_agg_exec_node::InputOrderMode::PartiallySorted(
3468 protobuf::PartiallySortedInputOrderMode {
3469 columns: columns.iter().map(|c| *c as u64).collect(),
3470 },
3471 )
3472 }
3473 InputOrderMode::Sorted => {
3474 window_agg_exec_node::InputOrderMode::Sorted(protobuf::EmptyMessage {})
3475 }
3476 };
3477
3478 Ok(protobuf::PhysicalPlanNode {
3479 physical_plan_type: Some(PhysicalPlanType::Window(Box::new(
3480 protobuf::WindowAggExecNode {
3481 input: Some(Box::new(input)),
3482 window_expr,
3483 partition_keys,
3484 input_order_mode: Some(input_order_mode),
3485 },
3486 ))),
3487 })
3488 }
3489
3490 fn try_from_data_sink_exec(
3491 exec: &DataSinkExec,
3492 codec: &dyn PhysicalExtensionCodec,
3493 proto_converter: &dyn PhysicalProtoConverterExtension,
3494 ) -> Result<Option<Self>> {
3495 let input: protobuf::PhysicalPlanNode =
3496 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3497 exec.input().to_owned(),
3498 codec,
3499 proto_converter,
3500 )?;
3501 let sort_order = match exec.sort_order() {
3502 Some(requirements) => {
3503 let expr = requirements
3504 .iter()
3505 .map(|requirement| {
3506 let expr: PhysicalSortExpr = requirement.to_owned().into();
3507 let sort_expr = protobuf::PhysicalSortExprNode {
3508 expr: Some(Box::new(
3509 proto_converter
3510 .physical_expr_to_proto(&expr.expr, codec)?,
3511 )),
3512 asc: !expr.options.descending,
3513 nulls_first: expr.options.nulls_first,
3514 };
3515 Ok(sort_expr)
3516 })
3517 .collect::<Result<Vec<_>>>()?;
3518 Some(protobuf::PhysicalSortExprNodeCollection {
3519 physical_sort_expr_nodes: expr,
3520 })
3521 }
3522 None => None,
3523 };
3524
3525 if let Some(sink) = exec.sink().downcast_ref::<JsonSink>() {
3526 return Ok(Some(protobuf::PhysicalPlanNode {
3527 physical_plan_type: Some(PhysicalPlanType::JsonSink(Box::new(
3528 protobuf::JsonSinkExecNode {
3529 input: Some(Box::new(input)),
3530 sink: Some(sink.try_into()?),
3531 sink_schema: Some(exec.schema().as_ref().try_into()?),
3532 sort_order,
3533 },
3534 ))),
3535 }));
3536 }
3537
3538 if let Some(sink) = exec.sink().downcast_ref::<CsvSink>() {
3539 return Ok(Some(protobuf::PhysicalPlanNode {
3540 physical_plan_type: Some(PhysicalPlanType::CsvSink(Box::new(
3541 protobuf::CsvSinkExecNode {
3542 input: Some(Box::new(input)),
3543 sink: Some(sink.try_into()?),
3544 sink_schema: Some(exec.schema().as_ref().try_into()?),
3545 sort_order,
3546 },
3547 ))),
3548 }));
3549 }
3550
3551 #[cfg(feature = "parquet")]
3552 if let Some(sink) = exec.sink().downcast_ref::<ParquetSink>() {
3553 return Ok(Some(protobuf::PhysicalPlanNode {
3554 physical_plan_type: Some(PhysicalPlanType::ParquetSink(Box::new(
3555 protobuf::ParquetSinkExecNode {
3556 input: Some(Box::new(input)),
3557 sink: Some(sink.try_into()?),
3558 sink_schema: Some(exec.schema().as_ref().try_into()?),
3559 sort_order,
3560 },
3561 ))),
3562 }));
3563 }
3564
3565 Ok(None)
3567 }
3568
3569 fn try_from_unnest_exec(
3570 exec: &UnnestExec,
3571 codec: &dyn PhysicalExtensionCodec,
3572 proto_converter: &dyn PhysicalProtoConverterExtension,
3573 ) -> Result<Self> {
3574 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3575 exec.input().to_owned(),
3576 codec,
3577 proto_converter,
3578 )?;
3579
3580 Ok(protobuf::PhysicalPlanNode {
3581 physical_plan_type: Some(PhysicalPlanType::Unnest(Box::new(
3582 protobuf::UnnestExecNode {
3583 input: Some(Box::new(input)),
3584 schema: Some(exec.schema().try_into()?),
3585 list_type_columns: exec
3586 .list_column_indices()
3587 .iter()
3588 .map(|c| ProtoListUnnest {
3589 index_in_input_schema: c.index_in_input_schema as _,
3590 depth: c.depth as _,
3591 })
3592 .collect(),
3593 struct_type_columns: exec
3594 .struct_column_indices()
3595 .iter()
3596 .map(|c| *c as _)
3597 .collect(),
3598 options: Some(exec.options().into()),
3599 },
3600 ))),
3601 })
3602 }
3603
3604 fn try_from_cooperative_exec(
3605 exec: &CooperativeExec,
3606 codec: &dyn PhysicalExtensionCodec,
3607 proto_converter: &dyn PhysicalProtoConverterExtension,
3608 ) -> Result<Self> {
3609 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3610 exec.input().to_owned(),
3611 codec,
3612 proto_converter,
3613 )?;
3614
3615 Ok(protobuf::PhysicalPlanNode {
3616 physical_plan_type: Some(PhysicalPlanType::Cooperative(Box::new(
3617 protobuf::CooperativeExecNode {
3618 input: Some(Box::new(input)),
3619 },
3620 ))),
3621 })
3622 }
3623
3624 fn str_to_generate_series_name(name: &str) -> Result<protobuf::GenerateSeriesName> {
3625 match name {
3626 "generate_series" => Ok(protobuf::GenerateSeriesName::GsGenerateSeries),
3627 "range" => Ok(protobuf::GenerateSeriesName::GsRange),
3628 _ => internal_err!("unknown name: {name}"),
3629 }
3630 }
3631
3632 fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result<Option<Self>> {
3633 let generators = exec.generators();
3634
3635 let [generator] = generators.as_slice() else {
3637 return Ok(None);
3638 };
3639
3640 let generator_guard = generator.read();
3641
3642 if let Some(empty_gen) = generator_guard.as_any().downcast_ref::<Empty>() {
3644 let schema = exec.schema();
3645 let node = protobuf::GenerateSeriesNode {
3646 schema: Some(schema.as_ref().try_into()?),
3647 target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull(
3649 protobuf::GenerateSeriesArgsContainsNull {
3650 name: Self::str_to_generate_series_name(empty_gen.name())? as i32,
3651 },
3652 )),
3653 };
3654
3655 return Ok(Some(protobuf::PhysicalPlanNode {
3656 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3657 }));
3658 }
3659
3660 if let Some(int_64) = generator_guard
3661 .as_any()
3662 .downcast_ref::<GenericSeriesState<i64>>()
3663 {
3664 let schema = exec.schema();
3665 let node = protobuf::GenerateSeriesNode {
3666 schema: Some(schema.as_ref().try_into()?),
3667 target_batch_size: int_64.batch_size() as u32,
3668 args: Some(protobuf::generate_series_node::Args::Int64Args(
3669 protobuf::GenerateSeriesArgsInt64 {
3670 start: *int_64.start(),
3671 end: *int_64.end(),
3672 step: *int_64.step(),
3673 include_end: int_64.include_end(),
3674 name: Self::str_to_generate_series_name(int_64.name())? as i32,
3675 },
3676 )),
3677 };
3678
3679 return Ok(Some(protobuf::PhysicalPlanNode {
3680 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3681 }));
3682 }
3683
3684 if let Some(timestamp_args) = generator_guard
3685 .as_any()
3686 .downcast_ref::<GenericSeriesState<TimestampValue>>()
3687 {
3688 let schema = exec.schema();
3689
3690 let start = timestamp_args.start().value();
3691 let end = timestamp_args.end().value();
3692
3693 let step_value = timestamp_args.step();
3694
3695 let step = Some(datafusion_proto_common::IntervalMonthDayNanoValue {
3696 months: step_value.months,
3697 days: step_value.days,
3698 nanos: step_value.nanoseconds,
3699 });
3700 let include_end = timestamp_args.include_end();
3701 let name = Self::str_to_generate_series_name(timestamp_args.name())? as i32;
3702
3703 let args = match timestamp_args.current().tz_str() {
3704 Some(tz) => protobuf::generate_series_node::Args::TimestampArgs(
3705 protobuf::GenerateSeriesArgsTimestamp {
3706 start,
3707 end,
3708 step,
3709 include_end,
3710 name,
3711 tz: Some(tz.to_string()),
3712 },
3713 ),
3714 None => protobuf::generate_series_node::Args::DateArgs(
3715 protobuf::GenerateSeriesArgsDate {
3716 start,
3717 end,
3718 step,
3719 include_end,
3720 name,
3721 },
3722 ),
3723 };
3724
3725 let node = protobuf::GenerateSeriesNode {
3726 schema: Some(schema.as_ref().try_into()?),
3727 target_batch_size: timestamp_args.batch_size() as u32,
3728 args: Some(args),
3729 };
3730
3731 return Ok(Some(protobuf::PhysicalPlanNode {
3732 physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)),
3733 }));
3734 }
3735
3736 Ok(None)
3737 }
3738
3739 fn try_from_async_func_exec(
3740 exec: &AsyncFuncExec,
3741 codec: &dyn PhysicalExtensionCodec,
3742 proto_converter: &dyn PhysicalProtoConverterExtension,
3743 ) -> Result<Self> {
3744 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3745 Arc::clone(exec.input()),
3746 codec,
3747 proto_converter,
3748 )?;
3749
3750 let mut async_exprs = vec![];
3751 let mut async_expr_names = vec![];
3752
3753 for async_expr in exec.async_exprs() {
3754 async_exprs
3755 .push(proto_converter.physical_expr_to_proto(&async_expr.func, codec)?);
3756 async_expr_names.push(async_expr.name.clone())
3757 }
3758
3759 Ok(protobuf::PhysicalPlanNode {
3760 physical_plan_type: Some(PhysicalPlanType::AsyncFunc(Box::new(
3761 protobuf::AsyncFuncExecNode {
3762 input: Some(Box::new(input)),
3763 async_exprs,
3764 async_expr_names,
3765 },
3766 ))),
3767 })
3768 }
3769
3770 fn try_from_buffer_exec(
3771 exec: &BufferExec,
3772 extension_codec: &dyn PhysicalExtensionCodec,
3773 proto_converter: &dyn PhysicalProtoConverterExtension,
3774 ) -> Result<Self> {
3775 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3776 Arc::clone(exec.input()),
3777 extension_codec,
3778 proto_converter,
3779 )?;
3780
3781 Ok(protobuf::PhysicalPlanNode {
3782 physical_plan_type: Some(PhysicalPlanType::Buffer(Box::new(
3783 protobuf::BufferExecNode {
3784 input: Some(Box::new(input)),
3785 capacity: exec.capacity() as u64,
3786 },
3787 ))),
3788 })
3789 }
3790
3791 fn try_from_scalar_subquery_exec(
3792 exec: &ScalarSubqueryExec,
3793 codec: &dyn PhysicalExtensionCodec,
3794 proto_converter: &dyn PhysicalProtoConverterExtension,
3795 ) -> Result<Self> {
3796 let input = protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3797 Arc::clone(exec.input()),
3798 codec,
3799 proto_converter,
3800 )?;
3801 let subqueries = exec
3802 .subqueries()
3803 .iter()
3804 .map(|sq| {
3805 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
3806 Arc::clone(&sq.plan),
3807 codec,
3808 proto_converter,
3809 )
3810 })
3811 .collect::<Result<Vec<_>>>()?;
3812
3813 Ok(protobuf::PhysicalPlanNode {
3814 physical_plan_type: Some(PhysicalPlanType::ScalarSubquery(Box::new(
3815 protobuf::ScalarSubqueryExecNode {
3816 input: Some(Box::new(input)),
3817 subqueries,
3818 },
3819 ))),
3820 })
3821 }
3822}
3823
3824pub trait AsExecutionPlan: Debug + Send + Sync + Clone {
3825 fn try_decode(buf: &[u8]) -> Result<Self>
3826 where
3827 Self: Sized;
3828
3829 fn try_encode<B>(&self, buf: &mut B) -> Result<()>
3830 where
3831 B: BufMut,
3832 Self: Sized;
3833
3834 fn try_into_physical_plan(
3835 &self,
3836 ctx: &TaskContext,
3837
3838 codec: &dyn PhysicalExtensionCodec,
3839 ) -> Result<Arc<dyn ExecutionPlan>>;
3840
3841 fn try_from_physical_plan(
3842 plan: Arc<dyn ExecutionPlan>,
3843 codec: &dyn PhysicalExtensionCodec,
3844 ) -> Result<Self>
3845 where
3846 Self: Sized;
3847}
3848
3849pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
3850 fn try_decode(
3851 &self,
3852 buf: &[u8],
3853 inputs: &[Arc<dyn ExecutionPlan>],
3854 ctx: &TaskContext,
3855 ) -> Result<Arc<dyn ExecutionPlan>>;
3856
3857 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()>;
3858
3859 fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3860 not_impl_err!("PhysicalExtensionCodec is not provided for scalar function {name}")
3861 }
3862
3863 fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
3864 Ok(())
3865 }
3866
3867 fn try_decode_expr(
3868 &self,
3869 _buf: &[u8],
3870 _inputs: &[Arc<dyn PhysicalExpr>],
3871 ) -> Result<Arc<dyn PhysicalExpr>> {
3872 not_impl_err!("PhysicalExtensionCodec is not provided")
3873 }
3874
3875 fn try_encode_expr(
3876 &self,
3877 _node: &Arc<dyn PhysicalExpr>,
3878 _buf: &mut Vec<u8>,
3879 ) -> Result<()> {
3880 not_impl_err!("PhysicalExtensionCodec is not provided")
3881 }
3882
3883 fn try_decode_udaf(&self, name: &str, _buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3884 not_impl_err!(
3885 "PhysicalExtensionCodec is not provided for aggregate function {name}"
3886 )
3887 }
3888
3889 fn try_encode_udaf(&self, _node: &AggregateUDF, _buf: &mut Vec<u8>) -> Result<()> {
3890 Ok(())
3891 }
3892
3893 fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
3894 not_impl_err!("PhysicalExtensionCodec is not provided for window function {name}")
3895 }
3896
3897 fn try_encode_udwf(&self, _node: &WindowUDF, _buf: &mut Vec<u8>) -> Result<()> {
3898 Ok(())
3899 }
3900}
3901
3902#[derive(Debug)]
3903pub struct DefaultPhysicalExtensionCodec {}
3904
3905impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
3906 fn try_decode(
3907 &self,
3908 _buf: &[u8],
3909 _inputs: &[Arc<dyn ExecutionPlan>],
3910 _ctx: &TaskContext,
3911 ) -> Result<Arc<dyn ExecutionPlan>> {
3912 not_impl_err!("PhysicalExtensionCodec is not provided")
3913 }
3914
3915 fn try_encode(
3916 &self,
3917 _node: Arc<dyn ExecutionPlan>,
3918 _buf: &mut Vec<u8>,
3919 ) -> Result<()> {
3920 not_impl_err!("PhysicalExtensionCodec is not provided")
3921 }
3922}
3923
3924pub trait PhysicalProtoConverterExtension {
3928 fn proto_to_execution_plan(
3929 &self,
3930 proto: &protobuf::PhysicalPlanNode,
3931 ctx: &PhysicalPlanDecodeContext<'_>,
3932 ) -> Result<Arc<dyn ExecutionPlan>>;
3933
3934 fn default_proto_to_execution_plan(
3935 &self,
3936 proto: &protobuf::PhysicalPlanNode,
3937 ctx: &PhysicalPlanDecodeContext<'_>,
3938 ) -> Result<Arc<dyn ExecutionPlan>>
3939 where
3940 Self: Sized,
3941 {
3942 proto.try_into_physical_plan_with_context(ctx, self)
3943 }
3944
3945 fn execution_plan_to_proto(
3946 &self,
3947 plan: &Arc<dyn ExecutionPlan>,
3948 codec: &dyn PhysicalExtensionCodec,
3949 ) -> Result<protobuf::PhysicalPlanNode>;
3950
3951 fn proto_to_physical_expr(
3952 &self,
3953 proto: &protobuf::PhysicalExprNode,
3954 input_schema: &Schema,
3955 ctx: &PhysicalPlanDecodeContext<'_>,
3956 ) -> Result<Arc<dyn PhysicalExpr>>;
3957
3958 fn default_proto_to_physical_expr(
3959 &self,
3960 proto: &protobuf::PhysicalExprNode,
3961 input_schema: &Schema,
3962 ctx: &PhysicalPlanDecodeContext<'_>,
3963 ) -> Result<Arc<dyn PhysicalExpr>>
3964 where
3965 Self: Sized,
3966 {
3967 parse_physical_expr_with_converter(proto, input_schema, ctx, self)
3968 }
3969
3970 fn physical_expr_to_proto(
3971 &self,
3972 expr: &Arc<dyn PhysicalExpr>,
3973 codec: &dyn PhysicalExtensionCodec,
3974 ) -> Result<protobuf::PhysicalExprNode>;
3975}
3976
3977#[derive(Clone, PartialEq, prost::Message)]
3980struct DataEncoderTuple {
3981 #[prost(uint32, tag = 1)]
3984 pub encoder_position: u32,
3985
3986 #[prost(bytes, tag = 2)]
3987 pub blob: Vec<u8>,
3988}
3989
3990pub struct DefaultPhysicalProtoConverter {}
3991
3992impl PhysicalProtoConverterExtension for DefaultPhysicalProtoConverter {
3993 fn proto_to_execution_plan(
3994 &self,
3995 proto: &protobuf::PhysicalPlanNode,
3996 ctx: &PhysicalPlanDecodeContext<'_>,
3997 ) -> Result<Arc<dyn ExecutionPlan>> {
3998 proto.try_into_physical_plan_with_context(ctx, self)
3999 }
4000
4001 fn execution_plan_to_proto(
4002 &self,
4003 plan: &Arc<dyn ExecutionPlan>,
4004 codec: &dyn PhysicalExtensionCodec,
4005 ) -> Result<protobuf::PhysicalPlanNode>
4006 where
4007 Self: Sized,
4008 {
4009 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4010 Arc::clone(plan),
4011 codec,
4012 self,
4013 )
4014 }
4015
4016 fn proto_to_physical_expr(
4017 &self,
4018 proto: &protobuf::PhysicalExprNode,
4019 input_schema: &Schema,
4020 ctx: &PhysicalPlanDecodeContext<'_>,
4021 ) -> Result<Arc<dyn PhysicalExpr>>
4022 where
4023 Self: Sized,
4024 {
4025 parse_physical_expr_with_converter(proto, input_schema, ctx, self)
4027 }
4028
4029 fn physical_expr_to_proto(
4030 &self,
4031 expr: &Arc<dyn PhysicalExpr>,
4032 codec: &dyn PhysicalExtensionCodec,
4033 ) -> Result<protobuf::PhysicalExprNode> {
4034 serialize_physical_expr_with_converter(expr, codec, self)
4035 }
4036}
4037
4038#[derive(Default)]
4041struct DeduplicatingDeserializer {
4042 cache: RefCell<HashMap<u64, Arc<dyn PhysicalExpr>>>,
4044}
4045
4046impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
4047 fn proto_to_execution_plan(
4048 &self,
4049 proto: &protobuf::PhysicalPlanNode,
4050 ctx: &PhysicalPlanDecodeContext<'_>,
4051 ) -> Result<Arc<dyn ExecutionPlan>> {
4052 proto.try_into_physical_plan_with_context(ctx, self)
4053 }
4054
4055 fn execution_plan_to_proto(
4056 &self,
4057 _plan: &Arc<dyn ExecutionPlan>,
4058 _codec: &dyn PhysicalExtensionCodec,
4059 ) -> Result<protobuf::PhysicalPlanNode>
4060 where
4061 Self: Sized,
4062 {
4063 internal_err!("DeduplicatingDeserializer cannot serialize execution plans")
4064 }
4065
4066 fn proto_to_physical_expr(
4067 &self,
4068 proto: &protobuf::PhysicalExprNode,
4069 input_schema: &Schema,
4070 ctx: &PhysicalPlanDecodeContext<'_>,
4071 ) -> Result<Arc<dyn PhysicalExpr>>
4072 where
4073 Self: Sized,
4074 {
4075 let Some(id) = proto.expr_id else {
4079 return parse_physical_expr_with_converter(proto, input_schema, ctx, self);
4080 };
4081
4082 let parsed = parse_physical_expr_with_converter(proto, input_schema, ctx, self)?;
4083
4084 let mut cache = self.cache.borrow_mut();
4085 if let Some(cached) = cache.get(&id) {
4086 let children: Vec<_> = parsed.children().into_iter().cloned().collect();
4094 return Arc::clone(cached).with_new_children(children);
4095 }
4096
4097 cache.insert(id, Arc::clone(&parsed));
4098 Ok(parsed)
4099 }
4100
4101 fn physical_expr_to_proto(
4102 &self,
4103 _expr: &Arc<dyn PhysicalExpr>,
4104 _codec: &dyn PhysicalExtensionCodec,
4105 ) -> Result<protobuf::PhysicalExprNode> {
4106 internal_err!("DeduplicatingDeserializer cannot serialize physical expressions")
4107 }
4108}
4109
4110#[derive(Debug, Default, Clone, Copy)]
4119pub struct DeduplicatingProtoConverter {}
4120
4121impl PhysicalProtoConverterExtension for DeduplicatingProtoConverter {
4122 fn proto_to_execution_plan(
4123 &self,
4124 proto: &protobuf::PhysicalPlanNode,
4125 ctx: &PhysicalPlanDecodeContext<'_>,
4126 ) -> Result<Arc<dyn ExecutionPlan>> {
4127 let deserializer = DeduplicatingDeserializer::default();
4128 proto.try_into_physical_plan_with_context(ctx, &deserializer)
4129 }
4130
4131 fn execution_plan_to_proto(
4132 &self,
4133 plan: &Arc<dyn ExecutionPlan>,
4134 codec: &dyn PhysicalExtensionCodec,
4135 ) -> Result<protobuf::PhysicalPlanNode>
4136 where
4137 Self: Sized,
4138 {
4139 protobuf::PhysicalPlanNode::try_from_physical_plan_with_converter(
4140 Arc::clone(plan),
4141 codec,
4142 self,
4143 )
4144 }
4145
4146 fn proto_to_physical_expr(
4147 &self,
4148 proto: &protobuf::PhysicalExprNode,
4149 input_schema: &Schema,
4150 ctx: &PhysicalPlanDecodeContext<'_>,
4151 ) -> Result<Arc<dyn PhysicalExpr>>
4152 where
4153 Self: Sized,
4154 {
4155 let deserializer = DeduplicatingDeserializer::default();
4156 deserializer.proto_to_physical_expr(proto, input_schema, ctx)
4157 }
4158
4159 fn physical_expr_to_proto(
4160 &self,
4161 expr: &Arc<dyn PhysicalExpr>,
4162 codec: &dyn PhysicalExtensionCodec,
4163 ) -> Result<protobuf::PhysicalExprNode> {
4164 serialize_physical_expr_with_converter(expr, codec, self)
4165 }
4166}
4167
4168#[derive(Debug)]
4171pub struct ComposedPhysicalExtensionCodec {
4172 codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
4173}
4174
4175impl ComposedPhysicalExtensionCodec {
4176 pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
4179 Self { codecs }
4180 }
4181
4182 fn decode_protobuf<R>(
4183 &self,
4184 buf: &[u8],
4185 decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
4186 ) -> Result<R> {
4187 let proto =
4188 DataEncoderTuple::decode(buf).map_err(|e| internal_datafusion_err!("{e}"))?;
4189
4190 let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
4191 internal_datafusion_err!("Can't find required codec in codec list"),
4192 )?;
4193
4194 decode(codec.as_ref(), &proto.blob)
4195 }
4196
4197 fn encode_protobuf(
4198 &self,
4199 buf: &mut Vec<u8>,
4200 mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
4201 ) -> Result<()> {
4202 let mut data = vec![];
4203 let mut last_err = None;
4204 let mut encoder_position = None;
4205
4206 for (position, codec) in self.codecs.iter().enumerate() {
4208 match encode(codec.as_ref(), &mut data) {
4209 Ok(_) => {
4210 encoder_position = Some(position as u32);
4211 break;
4212 }
4213 Err(err) => last_err = Some(err),
4214 }
4215 }
4216
4217 let encoder_position = encoder_position.ok_or_else(|| {
4218 last_err.unwrap_or_else(|| {
4219 DataFusionError::NotImplemented(
4220 "Empty list of composed codecs".to_owned(),
4221 )
4222 })
4223 })?;
4224
4225 let proto = DataEncoderTuple {
4227 encoder_position,
4228 blob: data,
4229 };
4230 proto
4231 .encode(buf)
4232 .map_err(|e| internal_datafusion_err!("{e}"))
4233 }
4234}
4235
4236impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
4237 fn try_decode(
4238 &self,
4239 buf: &[u8],
4240 inputs: &[Arc<dyn ExecutionPlan>],
4241 ctx: &TaskContext,
4242 ) -> Result<Arc<dyn ExecutionPlan>> {
4243 self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, ctx))
4244 }
4245
4246 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
4247 self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
4248 }
4249
4250 fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
4251 self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
4252 }
4253
4254 fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
4255 self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
4256 }
4257
4258 fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
4259 self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
4260 }
4261
4262 fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
4263 self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
4264 }
4265}
4266
4267fn into_physical_plan(
4268 node: &Option<Box<protobuf::PhysicalPlanNode>>,
4269 ctx: &PhysicalPlanDecodeContext<'_>,
4270 proto_converter: &dyn PhysicalProtoConverterExtension,
4271) -> Result<Arc<dyn ExecutionPlan>> {
4272 if let Some(field) = node {
4273 proto_converter.proto_to_execution_plan(field, ctx)
4274 } else {
4275 Err(proto_error("Missing required field in protobuf"))
4276 }
4277}