1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_plan::prelude::sink::CallbackSinkType;
6use polars_utils::unique_id::UniqueId;
7use recursive::recursive;
8
9#[cfg(feature = "python")]
10use self::python_dsl::PythonScanSource;
11use super::*;
12use crate::executors::{self, CachePrefiller, Executor, GroupByStreamingExec, SinkExecutor};
13use crate::scan_predicate::functions::create_scan_predicate;
14
15pub type StreamingExecutorBuilder =
16 fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
17
18fn partitionable_gb(
19 keys: &[ExprIR],
20 aggs: &[ExprIR],
21 input_schema: &Schema,
22 expr_arena: &Arena<AExpr>,
23 apply: &Option<PlanCallback<DataFrame, DataFrame>>,
24) -> bool {
25 if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
32 for key in keys {
35 if (expr_arena).iter(key.node()).count() > 1
36 || has_aexpr(key.node(), expr_arena, |ae| match ae {
37 AExpr::Literal(lv) => !lv.is_scalar(),
38 _ => false,
39 })
40 {
41 return false;
42 }
43 }
44
45 can_pre_agg_exprs(aggs, expr_arena, input_schema)
46 } else {
47 false
48 }
49}
50
51#[derive(Clone)]
52struct ConversionState {
53 has_cache_child: bool,
54 has_cache_parent: bool,
55}
56
57impl ConversionState {
58 fn new() -> PolarsResult<Self> {
59 Ok(ConversionState {
60 has_cache_child: false,
61 has_cache_parent: false,
62 })
63 }
64
65 fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
66 let mut new_state = self.clone();
67 new_state.has_cache_child = false;
68 let out = func(&mut new_state);
69 self.has_cache_child = new_state.has_cache_child;
70 out
71 }
72}
73
74pub fn create_physical_plan(
75 root: Node,
76 lp_arena: &mut Arena<IR>,
77 expr_arena: &mut Arena<AExpr>,
78 build_streaming_executor: Option<StreamingExecutorBuilder>,
79) -> PolarsResult<Box<dyn Executor>> {
80 let mut state = ConversionState::new()?;
81 let mut cache_nodes = Default::default();
82 let plan = create_physical_plan_impl(
83 root,
84 lp_arena,
85 expr_arena,
86 &mut state,
87 &mut cache_nodes,
88 build_streaming_executor,
89 )?;
90
91 if cache_nodes.is_empty() {
92 Ok(plan)
93 } else {
94 Ok(Box::new(CachePrefiller {
95 caches: cache_nodes,
96 phys_plan: plan,
97 }))
98 }
99}
100
101pub struct MultiplePhysicalPlans {
102 pub cache_prefiller: Option<Box<dyn Executor>>,
103 pub physical_plans: Vec<Box<dyn Executor>>,
104}
105pub fn create_multiple_physical_plans(
106 roots: &[Node],
107 lp_arena: &mut Arena<IR>,
108 expr_arena: &mut Arena<AExpr>,
109 build_streaming_executor: Option<StreamingExecutorBuilder>,
110) -> PolarsResult<MultiplePhysicalPlans> {
111 let mut state = ConversionState::new()?;
112 let mut cache_nodes = Default::default();
113 let plans = state.with_new_branch(|new_state| {
114 roots
115 .iter()
116 .map(|&node| {
117 create_physical_plan_impl(
118 node,
119 lp_arena,
120 expr_arena,
121 new_state,
122 &mut cache_nodes,
123 build_streaming_executor,
124 )
125 })
126 .collect::<PolarsResult<Vec<_>>>()
127 })?;
128
129 let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
130 struct Empty;
131 impl Executor for Empty {
132 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
133 Ok(DataFrame::empty())
134 }
135 }
136 Box::new(CachePrefiller {
137 caches: cache_nodes,
138 phys_plan: Box::new(Empty),
139 }) as _
140 });
141
142 Ok(MultiplePhysicalPlans {
143 cache_prefiller,
144 physical_plans: plans,
145 })
146}
147
148#[cfg(feature = "python")]
149#[allow(clippy::type_complexity)]
150pub fn python_scan_predicate(
151 options: &mut PythonOptions,
152 expr_arena: &mut Arena<AExpr>,
153 state: &mut ExpressionConversionState,
154) -> PolarsResult<(
155 Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
156 Option<Vec<u8>>,
157)> {
158 let mut predicate_serialized = None;
159 let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
160 if matches!(options.python_source, PythonScanSource::Pyarrow) {
162 use polars_core::config::verbose_print_sensitive;
163
164 let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(
165 e.node(),
166 expr_arena,
167 Default::default(),
168 );
169
170 verbose_print_sensitive(|| {
171 format!(
172 "python_scan_predicate: \
173 predicate node: {}, \
174 converted pyarrow predicate: {}",
175 ExprIRDisplay::display_node(e.node(), expr_arena),
176 &predicate_pa.as_deref().unwrap_or("<conversion failed>")
177 )
178 });
179
180 if let Some(eval_str) = predicate_pa {
181 options.predicate = PythonPredicate::PyArrow(eval_str);
182 None
184 } else {
185 Some(create_physical_expr(e, expr_arena, &options.schema, state)?)
186 }
187 }
188 else {
190 let dsl_expr = e.to_expr(expr_arena);
191 predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
192
193 Some(create_physical_expr(e, expr_arena, &options.schema, state)?)
194 }
195 } else {
196 None
197 };
198
199 Ok((predicate, predicate_serialized))
200}
201
202#[recursive]
203fn create_physical_plan_impl(
204 root: Node,
205 lp_arena: &mut Arena<IR>,
206 expr_arena: &mut Arena<AExpr>,
207 state: &mut ConversionState,
208 cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
210 build_streaming_executor: Option<StreamingExecutorBuilder>,
211) -> PolarsResult<Box<dyn Executor>> {
212 use IR::*;
213
214 let get_streaming_executor_builder = || {
215 build_streaming_executor.expect(
216 "get_streaming_executor_builder() failed (hint: missing feature new-streaming?)",
217 )
218 };
219
220 macro_rules! recurse {
221 ($node:expr, $state: expr) => {
222 create_physical_plan_impl(
223 $node,
224 lp_arena,
225 expr_arena,
226 $state,
227 cache_nodes,
228 build_streaming_executor,
229 )
230 };
231 }
232
233 let logical_plan = if state.has_cache_parent
234 || matches!(
235 lp_arena.get(root),
236 IR::Scan { .. } | IR::Cache { .. } | IR::GroupBy { .. } | IR::Sink { payload:
241 SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. },
242 ..
243 }
244 ) {
245 lp_arena.get(root).clone()
246 } else {
247 lp_arena.take(root)
248 };
249
250 match logical_plan {
251 #[cfg(feature = "python")]
252 PythonScan { mut options } => {
253 let mut expr_conv_state = ExpressionConversionState::new(true);
254 let (predicate, predicate_serialized) =
255 python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
256 Ok(Box::new(executors::PythonScanExec {
257 options,
258 predicate,
259 predicate_serialized,
260 }))
261 },
262 Sink { input, payload } => match payload {
263 SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
264 input: recurse!(input, state)?,
265 name: PlSmallStr::from_static("mem"),
266 f: Box::new(move |df, _state| Ok(Some(df))),
267 })),
268 SinkTypeIR::Callback(CallbackSinkType {
269 function,
270 maintain_order: _,
271 chunk_size,
272 }) => {
273 let chunk_size = chunk_size.map_or(usize::MAX, Into::into);
274
275 Ok(Box::new(SinkExecutor {
276 input: recurse!(input, state)?,
277 name: PlSmallStr::from_static("batches"),
278 f: Box::new(move |mut buffer, _state| {
279 while buffer.height() > 0 {
280 let df;
281 (df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);
282 let should_stop = function.call(df)?;
283 if should_stop {
284 break;
285 }
286 }
287 Ok(Some(DataFrame::empty()))
288 }),
289 }))
290 },
291 SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. } => {
292 get_streaming_executor_builder()(root, lp_arena, expr_arena)
293 },
294 },
295 SinkMultiple { .. } => {
296 unreachable!("should be handled with create_multiple_physical_plans")
297 },
298 Union { inputs, options } => {
299 let inputs = state.with_new_branch(|new_state| {
300 inputs
301 .into_iter()
302 .map(|node| recurse!(node, new_state))
303 .collect::<PolarsResult<Vec<_>>>()
304 });
305 let inputs = inputs?;
306 Ok(Box::new(executors::UnionExec { inputs, options }))
307 },
308 HConcat {
309 inputs, options, ..
310 } => {
311 let inputs = state.with_new_branch(|new_state| {
312 inputs
313 .into_iter()
314 .map(|node| recurse!(node, new_state))
315 .collect::<PolarsResult<Vec<_>>>()
316 });
317
318 let inputs = inputs?;
319
320 Ok(Box::new(executors::HConcatExec { inputs, options }))
321 },
322 Slice { input, offset, len } => {
323 let input = recurse!(input, state)?;
324 Ok(Box::new(executors::SliceExec { input, offset, len }))
325 },
326 Filter { input, predicate } => {
327 let streamable = is_elementwise_rec(predicate.node(), expr_arena);
328 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
329 let input = recurse!(input, state)?;
330 let mut state = ExpressionConversionState::new(true);
331 let predicate =
332 create_physical_expr(&predicate, expr_arena, &input_schema, &mut state)?;
333 Ok(Box::new(executors::FilterExec::new(
334 predicate,
335 input,
336 state.has_windows,
337 streamable,
338 )))
339 },
340 #[allow(unused_variables)]
341 Scan {
342 sources,
343 file_info,
344 hive_parts,
345 output_schema,
346 scan_type,
347 predicate,
348 predicate_file_skip_applied,
349 unified_scan_args,
350 } => {
351 let mut expr_conversion_state = ExpressionConversionState::new(true);
352
353 let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();
354 #[cfg(feature = "parquet")]
355 {
356 if let FileScanIR::Parquet { options, .. } = scan_type.as_ref() {
357 create_skip_batch_predicate |= options.use_statistics;
358 }
359 }
360
361 let predicate = predicate
362 .map(|predicate| {
363 create_scan_predicate(
364 &predicate,
365 expr_arena,
366 output_schema.as_ref().unwrap_or(&file_info.schema),
367 None, &mut expr_conversion_state,
369 create_skip_batch_predicate,
370 false,
371 )
372 })
373 .transpose()?;
374
375 match *scan_type {
376 FileScanIR::Anonymous { function, .. } => {
377 Ok(Box::new(executors::AnonymousScanExec {
378 function,
379 predicate,
380 unified_scan_args,
381 file_info,
382 output_schema,
383 predicate_has_windows: expr_conversion_state.has_windows,
384 }))
385 },
386 #[cfg_attr(
387 not(any(
388 feature = "parquet",
389 feature = "ipc",
390 feature = "csv",
391 feature = "json",
392 feature = "scan_lines"
393 )),
394 expect(unreachable_patterns)
395 )]
396 _ => get_streaming_executor_builder()(root, lp_arena, expr_arena),
397 }
398 },
399
400 Select {
401 expr,
402 input,
403 schema: _schema,
404 options,
405 ..
406 } => {
407 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
408 let input = recurse!(input, state)?;
409 let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
410 let phys_expr =
411 create_physical_expressions_from_irs(&expr, expr_arena, &input_schema, &mut state)?;
412
413 let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
414 && !phys_expr.iter().all(|p| {
416 p.is_literal()
417 });
418
419 Ok(Box::new(executors::ProjectionExec {
420 input,
421 expr: phys_expr,
422 has_windows: state.has_windows,
423 input_schema,
424 #[cfg(test)]
425 schema: _schema,
426 options,
427 allow_vertical_parallelism,
428 }))
429 },
430 DataFrameScan {
431 df, output_schema, ..
432 } => Ok(Box::new(executors::DataFrameExec {
433 df,
434 projection: output_schema.map(|s| s.iter_names_cloned().collect()),
435 })),
436 Sort {
437 input,
438 by_column,
439 slice,
440 sort_options,
441 } => {
442 debug_assert!(!by_column.is_empty());
443 let input_schema = lp_arena.get(input).schema(lp_arena);
444 let by_column = create_physical_expressions_from_irs(
445 &by_column,
446 expr_arena,
447 input_schema.as_ref(),
448 &mut ExpressionConversionState::new(true),
449 )?;
450 let input = recurse!(input, state)?;
451 Ok(Box::new(executors::SortExec {
452 input,
453 by_column,
454 slice,
455 sort_options,
456 }))
457 },
458 Cache { input, id } => {
459 state.has_cache_parent = true;
460 state.has_cache_child = true;
461
462 if let Some(cache) = cache_nodes.get_mut(&id) {
463 Ok(Box::new(cache.make_exec()))
464 } else {
465 let input = recurse!(input, state)?;
466
467 let mut prefill = executors::CachePrefill::new_cache(input, id);
468 let exec = prefill.make_exec();
469
470 cache_nodes.insert(id, prefill);
471
472 Ok(Box::new(exec))
473 }
474 },
475 Distinct { input, options } => {
476 let input = recurse!(input, state)?;
477 Ok(Box::new(executors::UniqueExec { input, options }))
478 },
479 GroupBy {
480 input,
481 keys,
482 aggs,
483 apply,
484 schema: output_schema,
485 maintain_order,
486 options,
487 } => {
488 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
489 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
490 let phys_keys = create_physical_expressions_from_irs(
491 &keys,
492 expr_arena,
493 &input_schema,
494 &mut ExpressionConversionState::new(true),
495 )?;
496 let phys_aggs = create_physical_expressions_from_irs(
497 &aggs,
498 expr_arena,
499 &input_schema,
500 &mut ExpressionConversionState::new(true),
501 )?;
502
503 let _slice = options.slice;
504 #[cfg(feature = "dynamic_group_by")]
505 if let Some(options) = options.dynamic {
506 let input = recurse!(input, state)?;
507 return Ok(Box::new(executors::GroupByDynamicExec {
508 input,
509 keys: phys_keys,
510 aggs: phys_aggs,
511 options,
512 input_schema,
513 output_schema,
514 slice: _slice,
515 apply,
516 }));
517 }
518
519 #[cfg(feature = "dynamic_group_by")]
520 if let Some(options) = options.rolling {
521 let input = recurse!(input, state)?;
522 return Ok(Box::new(executors::GroupByRollingExec {
523 input,
524 keys: phys_keys,
525 aggs: phys_aggs,
526 options,
527 input_schema,
528 output_schema,
529 slice: _slice,
530 apply,
531 }));
532 }
533
534 let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
536 if partitionable {
537 let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
538 if let Union { options, .. } = lp {
539 options.from_partitioned_ds
540 } else {
541 false
542 }
543 });
544 let builder = get_streaming_executor_builder();
545
546 let input = recurse!(input, state)?;
547
548 let gb_root = if state.has_cache_parent {
549 lp_arena.add(lp_arena.get(root).clone())
550 } else {
551 root
552 };
553
554 let executor = Box::new(GroupByStreamingExec::new(
555 input,
556 builder,
557 gb_root,
558 lp_arena,
559 expr_arena,
560 phys_keys,
561 phys_aggs,
562 maintain_order,
563 output_schema,
564 _slice,
565 from_partitioned_ds,
566 ));
567
568 Ok(executor)
569 } else {
570 let input = recurse!(input, state)?;
571 Ok(Box::new(executors::GroupByExec::new(
572 input,
573 phys_keys,
574 phys_aggs,
575 apply,
576 maintain_order,
577 input_schema,
578 output_schema,
579 options.slice,
580 )))
581 }
582 },
583 Join {
584 input_left,
585 input_right,
586 left_on,
587 right_on,
588 options,
589 schema,
590 ..
591 } => {
592 let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
593 let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
594
595 let (input_left, input_right) = state.with_new_branch(|new_state| {
596 (
597 recurse!(input_left, new_state),
598 recurse!(input_right, new_state),
599 )
600 });
601 let input_left = input_left?;
602 let input_right = input_right?;
603
604 let parallel = if options.force_parallel {
606 true
607 } else {
608 options.allow_parallel
609 };
610
611 let left_on = create_physical_expressions_from_irs(
612 &left_on,
613 expr_arena,
614 &schema_left,
615 &mut ExpressionConversionState::new(true),
616 )?;
617 let right_on = create_physical_expressions_from_irs(
618 &right_on,
619 expr_arena,
620 &schema_right,
621 &mut ExpressionConversionState::new(true),
622 )?;
623 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
624
625 let join_type_options = options
628 .options
629 .map(|o| {
630 o.compile(|e| {
631 let phys_expr = create_physical_expr(
632 e,
633 expr_arena,
634 &schema,
635 &mut ExpressionConversionState::new(false),
636 )?;
637
638 let execution_state = ExecutionState::default();
639
640 Ok(Arc::new(move |df: DataFrame| {
641 let mask = phys_expr.evaluate(&df, &execution_state)?;
642 let mask = mask.as_materialized_series();
643 let mask = mask.bool()?;
644 df.filter_seq(mask)
645 }))
646 })
647 })
648 .transpose()?;
649
650 Ok(Box::new(executors::JoinExec::new(
651 input_left,
652 input_right,
653 left_on,
654 right_on,
655 parallel,
656 options.args,
657 join_type_options,
658 )))
659 },
660 HStack {
661 input,
662 exprs,
663 schema: output_schema,
664 options,
665 } => {
666 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
667 let input = recurse!(input, state)?;
668
669 let allow_vertical_parallelism = options.should_broadcast
670 && exprs
671 .iter()
672 .all(|e| is_elementwise_rec(e.node(), expr_arena));
673
674 let mut state =
675 ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
676
677 let phys_exprs = create_physical_expressions_from_irs(
678 &exprs,
679 expr_arena,
680 &input_schema,
681 &mut state,
682 )?;
683 Ok(Box::new(executors::StackExec {
684 input,
685 has_windows: state.has_windows,
686 exprs: phys_exprs,
687 input_schema,
688 output_schema,
689 options,
690 allow_vertical_parallelism,
691 }))
692 },
693 MapFunction {
694 input, function, ..
695 } => {
696 let input = recurse!(input, state)?;
697 Ok(Box::new(executors::UdfExec { input, function }))
698 },
699 ExtContext {
700 input, contexts, ..
701 } => {
702 let input = recurse!(input, state)?;
703 let contexts = contexts
704 .into_iter()
705 .map(|node| recurse!(node, state))
706 .collect::<PolarsResult<_>>()?;
707 Ok(Box::new(executors::ExternalContext { input, contexts }))
708 },
709 SimpleProjection { input, columns } => {
710 let input = recurse!(input, state)?;
711 let exec = executors::ProjectionSimple { input, columns };
712 Ok(Box::new(exec))
713 },
714 #[cfg(feature = "merge_sorted")]
715 MergeSorted {
716 input_left,
717 input_right,
718 key,
719 } => {
720 let (input_left, input_right) = state.with_new_branch(|new_state| {
721 (
722 recurse!(input_left, new_state),
723 recurse!(input_right, new_state),
724 )
725 });
726 let input_left = input_left?;
727 let input_right = input_right?;
728
729 let exec = executors::MergeSorted {
730 input_left,
731 input_right,
732 key,
733 };
734 Ok(Box::new(exec))
735 },
736 Invalid => unreachable!(),
737 }
738}
739
740#[cfg(test)]
741mod tests {
742 use super::*;
743
744 #[test]
745 fn test_create_multiple_physical_plans_reused_cache() {
746 let mut ir = Arena::new();
750
751 let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
752 let scan = ir.add(IR::DataFrameScan {
753 df: Arc::new(DataFrame::empty_with_schema(&schema)),
754 schema: Arc::new(schema),
755 output_schema: None,
756 });
757
758 let cache = ir.add(IR::Cache {
759 input: scan,
760 id: UniqueId::new(),
761 });
762
763 let left_sink = ir.add(IR::Sink {
764 input: cache,
765 payload: SinkTypeIR::Memory,
766 });
767 let right_sink = ir.add(IR::Sink {
768 input: cache,
769 payload: SinkTypeIR::Memory,
770 });
771
772 let _multiplan = create_multiple_physical_plans(
773 &[left_sink, right_sink],
774 &mut ir,
775 &mut Arena::new(),
776 None,
777 )
778 .unwrap();
779 }
780}