1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_utils::unique_id::UniqueId;
6use recursive::recursive;
7
8#[cfg(feature = "python")]
9use self::python_dsl::PythonScanSource;
10use super::*;
11use crate::executors::{
12 self, CachePrefiller, Executor, GroupByStreamingExec, PartitionedSinkExecutor, SinkExecutor,
13 sink_name,
14};
15use crate::scan_predicate::functions::create_scan_predicate;
16
17pub type StreamingExecutorBuilder =
18 fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
19
20fn partitionable_gb(
21 keys: &[ExprIR],
22 aggs: &[ExprIR],
23 input_schema: &Schema,
24 expr_arena: &Arena<AExpr>,
25 apply: &Option<PlanCallback<DataFrame, DataFrame>>,
26) -> bool {
27 if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
34 for key in keys {
37 if (expr_arena).iter(key.node()).count() > 1
38 || has_aexpr(key.node(), expr_arena, |ae| match ae {
39 AExpr::Literal(lv) => !lv.is_scalar(),
40 _ => false,
41 })
42 {
43 return false;
44 }
45 }
46
47 can_pre_agg_exprs(aggs, expr_arena, input_schema)
48 } else {
49 false
50 }
51}
52
53#[derive(Clone)]
54struct ConversionState {
55 has_cache_child: bool,
56 has_cache_parent: bool,
57}
58
59impl ConversionState {
60 fn new() -> PolarsResult<Self> {
61 Ok(ConversionState {
62 has_cache_child: false,
63 has_cache_parent: false,
64 })
65 }
66
67 fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
68 let mut new_state = self.clone();
69 new_state.has_cache_child = false;
70 let out = func(&mut new_state);
71 self.has_cache_child = new_state.has_cache_child;
72 out
73 }
74}
75
76pub fn create_physical_plan(
77 root: Node,
78 lp_arena: &mut Arena<IR>,
79 expr_arena: &mut Arena<AExpr>,
80 build_streaming_executor: Option<StreamingExecutorBuilder>,
81) -> PolarsResult<Box<dyn Executor>> {
82 let mut state = ConversionState::new()?;
83 let mut cache_nodes = Default::default();
84 let plan = create_physical_plan_impl(
85 root,
86 lp_arena,
87 expr_arena,
88 &mut state,
89 &mut cache_nodes,
90 build_streaming_executor,
91 )?;
92
93 if cache_nodes.is_empty() {
94 Ok(plan)
95 } else {
96 Ok(Box::new(CachePrefiller {
97 caches: cache_nodes,
98 phys_plan: plan,
99 }))
100 }
101}
102
103pub struct MultiplePhysicalPlans {
104 pub cache_prefiller: Option<Box<dyn Executor>>,
105 pub physical_plans: Vec<Box<dyn Executor>>,
106}
107pub fn create_multiple_physical_plans(
108 roots: &[Node],
109 lp_arena: &mut Arena<IR>,
110 expr_arena: &mut Arena<AExpr>,
111 build_streaming_executor: Option<StreamingExecutorBuilder>,
112) -> PolarsResult<MultiplePhysicalPlans> {
113 let mut state = ConversionState::new()?;
114 let mut cache_nodes = Default::default();
115 let plans = state.with_new_branch(|new_state| {
116 roots
117 .iter()
118 .map(|&node| {
119 create_physical_plan_impl(
120 node,
121 lp_arena,
122 expr_arena,
123 new_state,
124 &mut cache_nodes,
125 build_streaming_executor,
126 )
127 })
128 .collect::<PolarsResult<Vec<_>>>()
129 })?;
130
131 let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
132 struct Empty;
133 impl Executor for Empty {
134 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
135 Ok(DataFrame::empty())
136 }
137 }
138 Box::new(CachePrefiller {
139 caches: cache_nodes,
140 phys_plan: Box::new(Empty),
141 }) as _
142 });
143
144 Ok(MultiplePhysicalPlans {
145 cache_prefiller,
146 physical_plans: plans,
147 })
148}
149
150#[cfg(feature = "python")]
151#[allow(clippy::type_complexity)]
152pub fn python_scan_predicate(
153 options: &mut PythonOptions,
154 expr_arena: &Arena<AExpr>,
155 state: &mut ExpressionConversionState,
156) -> PolarsResult<(
157 Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
158 Option<Vec<u8>>,
159)> {
160 let mut predicate_serialized = None;
161 let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
162 if matches!(options.python_source, PythonScanSource::Pyarrow) {
164 use polars_core::config::verbose_print_sensitive;
165
166 let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(
167 e.node(),
168 expr_arena,
169 Default::default(),
170 );
171
172 verbose_print_sensitive(|| {
173 format!(
174 "python_scan_predicate: \
175 predicate node: {}, \
176 converted pyarrow predicate: {}",
177 ExprIRDisplay::display_node(e.node(), expr_arena),
178 &predicate_pa.as_deref().unwrap_or("<conversion failed>")
179 )
180 });
181
182 if let Some(eval_str) = predicate_pa {
183 options.predicate = PythonPredicate::PyArrow(eval_str);
184 None
186 } else {
187 Some(create_physical_expr(
188 e,
189 Context::Default,
190 expr_arena,
191 &options.schema,
192 state,
193 )?)
194 }
195 }
196 else {
198 let dsl_expr = e.to_expr(expr_arena);
199 predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
200
201 Some(create_physical_expr(
202 e,
203 Context::Default,
204 expr_arena,
205 &options.schema,
206 state,
207 )?)
208 }
209 } else {
210 None
211 };
212
213 Ok((predicate, predicate_serialized))
214}
215
216#[recursive]
217fn create_physical_plan_impl(
218 root: Node,
219 lp_arena: &mut Arena<IR>,
220 expr_arena: &mut Arena<AExpr>,
221 state: &mut ConversionState,
222 cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
224 build_streaming_executor: Option<StreamingExecutorBuilder>,
225) -> PolarsResult<Box<dyn Executor>> {
226 use IR::*;
227
228 macro_rules! recurse {
229 ($node:expr, $state: expr) => {
230 create_physical_plan_impl(
231 $node,
232 lp_arena,
233 expr_arena,
234 $state,
235 cache_nodes,
236 build_streaming_executor,
237 )
238 };
239 }
240
241 let logical_plan = if state.has_cache_parent
242 || matches!(
243 lp_arena.get(root),
244 IR::Scan { .. } | IR::Cache { .. } | IR::GroupBy { .. } | IR::Sink { payload: SinkTypeIR::Partition(_),
249 ..
250 }
251 ) {
252 lp_arena.get(root).clone()
253 } else {
254 lp_arena.take(root)
255 };
256
257 match logical_plan {
258 #[cfg(feature = "python")]
259 PythonScan { mut options } => {
260 let mut expr_conv_state = ExpressionConversionState::new(true);
261 let (predicate, predicate_serialized) =
262 python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
263 Ok(Box::new(executors::PythonScanExec {
264 options,
265 predicate,
266 predicate_serialized,
267 }))
268 },
269 Sink { input, payload } => {
270 let input = recurse!(input, state)?;
271 match payload {
272 SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
273 input,
274 name: "mem".to_string(),
275 f: Box::new(move |df, _state| Ok(Some(df))),
276 })),
277 SinkTypeIR::Callback(CallbackSinkType {
278 function,
279 maintain_order: _,
280 chunk_size,
281 }) => {
282 let chunk_size = chunk_size.map_or(usize::MAX, Into::into);
283
284 Ok(Box::new(SinkExecutor {
285 input,
286 name: "batches".to_string(),
287 f: Box::new(move |mut buffer, _state| {
288 while !buffer.is_empty() {
289 let df;
290 (df, buffer) =
291 buffer.split_at(buffer.height().min(chunk_size) as i64);
292 let should_stop = function.call(df)?;
293 if should_stop {
294 break;
295 }
296 }
297 Ok(Some(DataFrame::empty()))
298 }),
299 }))
300 },
301 SinkTypeIR::File(FileSinkType {
302 file_type,
303 target,
304 sink_options,
305 cloud_options,
306 }) => {
307 let name = sink_name(&file_type).to_owned();
308 Ok(Box::new(SinkExecutor {
309 input,
310 name,
311 f: Box::new(move |mut df, _state| {
312 let mut file = target
313 .open_into_writeable(&sink_options, cloud_options.as_ref())?;
314 let writer = &mut *file;
315
316 use std::io::BufWriter;
317 match &file_type {
318 #[cfg(feature = "parquet")]
319 FileType::Parquet(options) => {
320 use polars_io::parquet::write::ParquetWriter;
321 ParquetWriter::new(BufWriter::new(writer))
322 .with_compression(options.compression)
323 .with_statistics(options.statistics)
324 .with_row_group_size(options.row_group_size)
325 .with_data_page_size(options.data_page_size)
326 .with_key_value_metadata(options.key_value_metadata.clone())
327 .finish(&mut df)?;
328 },
329 #[cfg(feature = "ipc")]
330 FileType::Ipc(options) => {
331 use polars_io::SerWriter;
332 use polars_io::ipc::IpcWriter;
333 IpcWriter::new(BufWriter::new(writer))
334 .with_compression(options.compression)
335 .with_compat_level(options.compat_level)
336 .finish(&mut df)?;
337 },
338 #[cfg(feature = "csv")]
339 FileType::Csv(options) => {
340 use polars_io::SerWriter;
341 use polars_io::csv::write::CsvWriter;
342 CsvWriter::new(BufWriter::new(writer))
343 .include_bom(options.include_bom)
344 .include_header(options.include_header)
345 .with_separator(options.serialize_options.separator)
346 .with_line_terminator(
347 options.serialize_options.line_terminator.clone(),
348 )
349 .with_quote_char(options.serialize_options.quote_char)
350 .with_batch_size(options.batch_size)
351 .with_datetime_format(
352 options.serialize_options.datetime_format.clone(),
353 )
354 .with_date_format(
355 options.serialize_options.date_format.clone(),
356 )
357 .with_time_format(
358 options.serialize_options.time_format.clone(),
359 )
360 .with_float_scientific(
361 options.serialize_options.float_scientific,
362 )
363 .with_float_precision(
364 options.serialize_options.float_precision,
365 )
366 .with_decimal_comma(options.serialize_options.decimal_comma)
367 .with_null_value(options.serialize_options.null.clone())
368 .with_quote_style(options.serialize_options.quote_style)
369 .finish(&mut df)?;
370 },
371 #[cfg(feature = "json")]
372 FileType::Json(_options) => {
373 use polars_io::SerWriter;
374 use polars_io::json::{JsonFormat, JsonWriter};
375
376 JsonWriter::new(BufWriter::new(writer))
377 .with_json_format(JsonFormat::JsonLines)
378 .finish(&mut df)?;
379 },
380 #[allow(unreachable_patterns)]
381 _ => panic!("enable filetype feature"),
382 }
383
384 file.sync_on_close(sink_options.sync_on_close)?;
385 file.close()?;
386
387 Ok(None)
388 }),
389 }))
390 },
391 SinkTypeIR::Partition(_) => {
392 let builder = build_streaming_executor
393 .expect("invalid build. Missing feature new-streaming");
394
395 let executor = Box::new(PartitionedSinkExecutor::new(
396 input, builder, root, lp_arena, expr_arena,
397 ));
398
399 let mut prefill = executors::CachePrefill::new_sink(executor);
403 let exec = prefill.make_exec();
404 let existing = cache_nodes.insert(prefill.id(), prefill);
405
406 assert!(existing.is_none());
407
408 Ok(Box::new(exec))
409 },
410 }
411 },
412 SinkMultiple { .. } => {
413 unreachable!("should be handled with create_multiple_physical_plans")
414 },
415 Union { inputs, options } => {
416 let inputs = state.with_new_branch(|new_state| {
417 inputs
418 .into_iter()
419 .map(|node| recurse!(node, new_state))
420 .collect::<PolarsResult<Vec<_>>>()
421 });
422 let inputs = inputs?;
423 Ok(Box::new(executors::UnionExec { inputs, options }))
424 },
425 HConcat {
426 inputs, options, ..
427 } => {
428 let inputs = state.with_new_branch(|new_state| {
429 inputs
430 .into_iter()
431 .map(|node| recurse!(node, new_state))
432 .collect::<PolarsResult<Vec<_>>>()
433 });
434
435 let inputs = inputs?;
436
437 Ok(Box::new(executors::HConcatExec { inputs, options }))
438 },
439 Slice { input, offset, len } => {
440 let input = recurse!(input, state)?;
441 Ok(Box::new(executors::SliceExec { input, offset, len }))
442 },
443 Filter { input, predicate } => {
444 let streamable = is_elementwise_rec(predicate.node(), expr_arena);
445 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
446 let input = recurse!(input, state)?;
447 let mut state = ExpressionConversionState::new(true);
448 let predicate = create_physical_expr(
449 &predicate,
450 Context::Default,
451 expr_arena,
452 &input_schema,
453 &mut state,
454 )?;
455 Ok(Box::new(executors::FilterExec::new(
456 predicate,
457 input,
458 state.has_windows,
459 streamable,
460 )))
461 },
462 #[allow(unused_variables)]
463 Scan {
464 sources,
465 file_info,
466 hive_parts,
467 output_schema,
468 scan_type,
469 predicate,
470 predicate_file_skip_applied,
471 unified_scan_args,
472 } => {
473 let mut expr_conversion_state = ExpressionConversionState::new(true);
474
475 let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();
476 #[cfg(feature = "parquet")]
477 {
478 create_skip_batch_predicate |= matches!(
479 &*scan_type,
480 FileScanIR::Parquet {
481 options: polars_io::prelude::ParquetOptions {
482 use_statistics: true,
483 ..
484 },
485 ..
486 }
487 );
488 }
489
490 let predicate = predicate
491 .map(|predicate| {
492 create_scan_predicate(
493 &predicate,
494 expr_arena,
495 output_schema.as_ref().unwrap_or(&file_info.schema),
496 None, &mut expr_conversion_state,
498 create_skip_batch_predicate,
499 false,
500 )
501 })
502 .transpose()?;
503
504 match *scan_type {
505 FileScanIR::Anonymous { function, .. } => {
506 Ok(Box::new(executors::AnonymousScanExec {
507 function,
508 predicate,
509 unified_scan_args,
510 file_info,
511 output_schema,
512 predicate_has_windows: expr_conversion_state.has_windows,
513 }))
514 },
515 #[allow(unreachable_patterns)]
516 _ => {
517 state.has_cache_parent = true;
525 state.has_cache_child = true;
526
527 let build_func = build_streaming_executor
528 .expect("invalid build. Missing feature new-streaming");
529
530 let executor = build_func(root, lp_arena, expr_arena)?;
531
532 let mut prefill = executors::CachePrefill::new_scan(executor);
533 let exec = prefill.make_exec();
534
535 let existing = cache_nodes.insert(prefill.id(), prefill);
536
537 assert!(existing.is_none());
538
539 Ok(Box::new(exec))
540 },
541 #[allow(unreachable_patterns)]
542 _ => unreachable!(),
543 }
544 },
545
546 Select {
547 expr,
548 input,
549 schema: _schema,
550 options,
551 ..
552 } => {
553 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
554 let input = recurse!(input, state)?;
555 let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
556 let phys_expr = create_physical_expressions_from_irs(
557 &expr,
558 Context::Default,
559 expr_arena,
560 &input_schema,
561 &mut state,
562 )?;
563
564 let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
565 && !phys_expr.iter().all(|p| {
567 p.is_literal()
568 });
569
570 Ok(Box::new(executors::ProjectionExec {
571 input,
572 expr: phys_expr,
573 has_windows: state.has_windows,
574 input_schema,
575 #[cfg(test)]
576 schema: _schema,
577 options,
578 allow_vertical_parallelism,
579 }))
580 },
581 DataFrameScan {
582 df, output_schema, ..
583 } => Ok(Box::new(executors::DataFrameExec {
584 df,
585 projection: output_schema.map(|s| s.iter_names_cloned().collect()),
586 })),
587 Sort {
588 input,
589 by_column,
590 slice,
591 sort_options,
592 } => {
593 debug_assert!(!by_column.is_empty());
594 let input_schema = lp_arena.get(input).schema(lp_arena);
595 let by_column = create_physical_expressions_from_irs(
596 &by_column,
597 Context::Default,
598 expr_arena,
599 input_schema.as_ref(),
600 &mut ExpressionConversionState::new(true),
601 )?;
602 let input = recurse!(input, state)?;
603 Ok(Box::new(executors::SortExec {
604 input,
605 by_column,
606 slice,
607 sort_options,
608 }))
609 },
610 Cache { input, id } => {
611 state.has_cache_parent = true;
612 state.has_cache_child = true;
613
614 if let Some(cache) = cache_nodes.get_mut(&id) {
615 Ok(Box::new(cache.make_exec()))
616 } else {
617 let input = recurse!(input, state)?;
618
619 let mut prefill = executors::CachePrefill::new_cache(input, id);
620 let exec = prefill.make_exec();
621
622 cache_nodes.insert(id, prefill);
623
624 Ok(Box::new(exec))
625 }
626 },
627 Distinct { input, options } => {
628 let input = recurse!(input, state)?;
629 Ok(Box::new(executors::UniqueExec { input, options }))
630 },
631 GroupBy {
632 input,
633 keys,
634 aggs,
635 apply,
636 schema: _,
637 maintain_order,
638 options,
639 } => {
640 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
641 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
642 let phys_keys = create_physical_expressions_from_irs(
643 &keys,
644 Context::Default,
645 expr_arena,
646 &input_schema,
647 &mut ExpressionConversionState::new(true),
648 )?;
649 let phys_aggs = create_physical_expressions_from_irs(
650 &aggs,
651 Context::Aggregation,
652 expr_arena,
653 &input_schema,
654 &mut ExpressionConversionState::new(true),
655 )?;
656
657 let _slice = options.slice;
658 #[cfg(feature = "dynamic_group_by")]
659 if let Some(options) = options.dynamic {
660 let input = recurse!(input, state)?;
661 return Ok(Box::new(executors::GroupByDynamicExec {
662 input,
663 keys: phys_keys,
664 aggs: phys_aggs,
665 options,
666 input_schema,
667 slice: _slice,
668 apply,
669 }));
670 }
671
672 #[cfg(feature = "dynamic_group_by")]
673 if let Some(options) = options.rolling {
674 let input = recurse!(input, state)?;
675 return Ok(Box::new(executors::GroupByRollingExec {
676 input,
677 keys: phys_keys,
678 aggs: phys_aggs,
679 options,
680 input_schema,
681 slice: _slice,
682 apply,
683 }));
684 }
685
686 let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
688 if partitionable {
689 let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
690 if let Union { options, .. } = lp {
691 options.from_partitioned_ds
692 } else {
693 false
694 }
695 });
696 let builder =
697 build_streaming_executor.expect("invalid build. Missing feature new-streaming");
698
699 let input = recurse!(input, state)?;
700 let executor = Box::new(GroupByStreamingExec::new(
701 input,
702 builder,
703 root,
704 lp_arena,
705 expr_arena,
706 phys_keys,
707 phys_aggs,
708 maintain_order,
709 _slice,
710 from_partitioned_ds,
711 ));
712
713 let mut prefill = executors::CachePrefill::new_sink(executor);
717 let exec = prefill.make_exec();
718 let existing = cache_nodes.insert(prefill.id(), prefill);
719
720 assert!(existing.is_none());
721
722 Ok(Box::new(exec))
723 } else {
724 let input = recurse!(input, state)?;
725 Ok(Box::new(executors::GroupByExec::new(
726 input,
727 phys_keys,
728 phys_aggs,
729 apply,
730 maintain_order,
731 input_schema,
732 options.slice,
733 )))
734 }
735 },
736 Join {
737 input_left,
738 input_right,
739 left_on,
740 right_on,
741 options,
742 schema,
743 ..
744 } => {
745 let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
746 let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
747
748 let (input_left, input_right) = state.with_new_branch(|new_state| {
749 (
750 recurse!(input_left, new_state),
751 recurse!(input_right, new_state),
752 )
753 });
754 let input_left = input_left?;
755 let input_right = input_right?;
756
757 let parallel = if options.force_parallel {
759 true
760 } else {
761 options.allow_parallel
762 };
763
764 let left_on = create_physical_expressions_from_irs(
765 &left_on,
766 Context::Default,
767 expr_arena,
768 &schema_left,
769 &mut ExpressionConversionState::new(true),
770 )?;
771 let right_on = create_physical_expressions_from_irs(
772 &right_on,
773 Context::Default,
774 expr_arena,
775 &schema_right,
776 &mut ExpressionConversionState::new(true),
777 )?;
778 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
779
780 let join_type_options = options
783 .options
784 .map(|o| {
785 o.compile(|e| {
786 let phys_expr = create_physical_expr(
787 e,
788 Context::Default,
789 expr_arena,
790 &schema,
791 &mut ExpressionConversionState::new(false),
792 )?;
793
794 let execution_state = ExecutionState::default();
795
796 Ok(Arc::new(move |df: DataFrame| {
797 let mask = phys_expr.evaluate(&df, &execution_state)?;
798 let mask = mask.as_materialized_series();
799 let mask = mask.bool()?;
800 df._filter_seq(mask)
801 }))
802 })
803 })
804 .transpose()?;
805
806 Ok(Box::new(executors::JoinExec::new(
807 input_left,
808 input_right,
809 left_on,
810 right_on,
811 parallel,
812 options.args,
813 join_type_options,
814 )))
815 },
816 HStack {
817 input,
818 exprs,
819 schema: output_schema,
820 options,
821 } => {
822 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
823 let input = recurse!(input, state)?;
824
825 let allow_vertical_parallelism = options.should_broadcast
826 && exprs
827 .iter()
828 .all(|e| is_elementwise_rec(e.node(), expr_arena));
829
830 let mut state =
831 ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
832
833 let phys_exprs = create_physical_expressions_from_irs(
834 &exprs,
835 Context::Default,
836 expr_arena,
837 &input_schema,
838 &mut state,
839 )?;
840 Ok(Box::new(executors::StackExec {
841 input,
842 has_windows: state.has_windows,
843 exprs: phys_exprs,
844 input_schema,
845 output_schema,
846 options,
847 allow_vertical_parallelism,
848 }))
849 },
850 MapFunction {
851 input, function, ..
852 } => {
853 let input = recurse!(input, state)?;
854 Ok(Box::new(executors::UdfExec { input, function }))
855 },
856 ExtContext {
857 input, contexts, ..
858 } => {
859 let input = recurse!(input, state)?;
860 let contexts = contexts
861 .into_iter()
862 .map(|node| recurse!(node, state))
863 .collect::<PolarsResult<_>>()?;
864 Ok(Box::new(executors::ExternalContext { input, contexts }))
865 },
866 SimpleProjection { input, columns } => {
867 let input = recurse!(input, state)?;
868 let exec = executors::ProjectionSimple { input, columns };
869 Ok(Box::new(exec))
870 },
871 #[cfg(feature = "merge_sorted")]
872 MergeSorted {
873 input_left,
874 input_right,
875 key,
876 } => {
877 let (input_left, input_right) = state.with_new_branch(|new_state| {
878 (
879 recurse!(input_left, new_state),
880 recurse!(input_right, new_state),
881 )
882 });
883 let input_left = input_left?;
884 let input_right = input_right?;
885
886 let exec = executors::MergeSorted {
887 input_left,
888 input_right,
889 key,
890 };
891 Ok(Box::new(exec))
892 },
893 Invalid => unreachable!(),
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900
901 #[test]
902 fn test_create_multiple_physical_plans_reused_cache() {
903 let mut ir = Arena::new();
907
908 let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
909 let scan = ir.add(IR::DataFrameScan {
910 df: Arc::new(DataFrame::empty_with_schema(&schema)),
911 schema: Arc::new(schema),
912 output_schema: None,
913 });
914
915 let cache = ir.add(IR::Cache {
916 input: scan,
917 id: UniqueId::new(),
918 });
919
920 let left_sink = ir.add(IR::Sink {
921 input: cache,
922 payload: SinkTypeIR::Memory,
923 });
924 let right_sink = ir.add(IR::Sink {
925 input: cache,
926 payload: SinkTypeIR::Memory,
927 });
928
929 let _multiplan = create_multiple_physical_plans(
930 &[left_sink, right_sink],
931 &mut ir,
932 &mut Arena::new(),
933 None,
934 )
935 .unwrap();
936 }
937}