Skip to main content

polars_mem_engine/planner/
lp.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_plan::prelude::sink::CallbackSinkType;
6use polars_utils::unique_id::UniqueId;
7use recursive::recursive;
8
9#[cfg(feature = "python")]
10use self::python_dsl::PythonScanSource;
11use super::*;
12use crate::executors::{self, CachePrefiller, Executor, GroupByStreamingExec, SinkExecutor};
13use crate::scan_predicate::functions::create_scan_predicate;
14
15pub type StreamingExecutorBuilder =
16    fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
17
18fn partitionable_gb(
19    keys: &[ExprIR],
20    aggs: &[ExprIR],
21    input_schema: &Schema,
22    expr_arena: &Arena<AExpr>,
23    apply: &Option<PlanCallback<DataFrame, DataFrame>>,
24) -> bool {
25    // checks:
26    //      1. complex expressions in the group_by itself are also not partitionable
27    //          in this case anything more than col("foo")
28    //      2. a custom function cannot be partitioned
29    //      3. we don't bother with more than 2 keys, as the cardinality likely explodes
30    //         by the combinations
31    if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
32        // complex expressions in the group_by itself are also not partitionable
33        // in this case anything more than col("foo")
34        for key in keys {
35            if (expr_arena).iter(key.node()).count() > 1
36                || has_aexpr(key.node(), expr_arena, |ae| match ae {
37                    AExpr::Literal(lv) => !lv.is_scalar(),
38                    _ => false,
39                })
40            {
41                return false;
42            }
43        }
44
45        can_pre_agg_exprs(aggs, expr_arena, input_schema)
46    } else {
47        false
48    }
49}
50
51#[derive(Clone)]
52struct ConversionState {
53    has_cache_child: bool,
54    has_cache_parent: bool,
55}
56
57impl ConversionState {
58    fn new() -> PolarsResult<Self> {
59        Ok(ConversionState {
60            has_cache_child: false,
61            has_cache_parent: false,
62        })
63    }
64
65    fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
66        let mut new_state = self.clone();
67        new_state.has_cache_child = false;
68        let out = func(&mut new_state);
69        self.has_cache_child = new_state.has_cache_child;
70        out
71    }
72}
73
74pub fn create_physical_plan(
75    root: Node,
76    lp_arena: &mut Arena<IR>,
77    expr_arena: &mut Arena<AExpr>,
78    build_streaming_executor: Option<StreamingExecutorBuilder>,
79) -> PolarsResult<Box<dyn Executor>> {
80    let mut state = ConversionState::new()?;
81    let mut cache_nodes = Default::default();
82    let plan = create_physical_plan_impl(
83        root,
84        lp_arena,
85        expr_arena,
86        &mut state,
87        &mut cache_nodes,
88        build_streaming_executor,
89    )?;
90
91    if cache_nodes.is_empty() {
92        Ok(plan)
93    } else {
94        Ok(Box::new(CachePrefiller {
95            caches: cache_nodes,
96            phys_plan: plan,
97        }))
98    }
99}
100
101pub struct MultiplePhysicalPlans {
102    pub cache_prefiller: Option<Box<dyn Executor>>,
103    pub physical_plans: Vec<Box<dyn Executor>>,
104}
105pub fn create_multiple_physical_plans(
106    roots: &[Node],
107    lp_arena: &mut Arena<IR>,
108    expr_arena: &mut Arena<AExpr>,
109    build_streaming_executor: Option<StreamingExecutorBuilder>,
110) -> PolarsResult<MultiplePhysicalPlans> {
111    let mut state = ConversionState::new()?;
112    let mut cache_nodes = Default::default();
113    let plans = state.with_new_branch(|new_state| {
114        roots
115            .iter()
116            .map(|&node| {
117                create_physical_plan_impl(
118                    node,
119                    lp_arena,
120                    expr_arena,
121                    new_state,
122                    &mut cache_nodes,
123                    build_streaming_executor,
124                )
125            })
126            .collect::<PolarsResult<Vec<_>>>()
127    })?;
128
129    let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
130        struct Empty;
131        impl Executor for Empty {
132            fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
133                Ok(DataFrame::empty())
134            }
135        }
136        Box::new(CachePrefiller {
137            caches: cache_nodes,
138            phys_plan: Box::new(Empty),
139        }) as _
140    });
141
142    Ok(MultiplePhysicalPlans {
143        cache_prefiller,
144        physical_plans: plans,
145    })
146}
147
148#[cfg(feature = "python")]
149#[allow(clippy::type_complexity)]
150pub fn python_scan_predicate(
151    options: &mut PythonOptions,
152    expr_arena: &mut Arena<AExpr>,
153    state: &mut ExpressionConversionState,
154) -> PolarsResult<(
155    Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
156    Option<Vec<u8>>,
157)> {
158    let mut predicate_serialized = None;
159    let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
160        // Convert to a pyarrow eval string.
161        if matches!(options.python_source, PythonScanSource::Pyarrow) {
162            use polars_core::config::verbose_print_sensitive;
163
164            let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(
165                e.node(),
166                expr_arena,
167                Default::default(),
168            );
169
170            verbose_print_sensitive(|| {
171                format!(
172                    "python_scan_predicate: \
173                    predicate node: {}, \
174                    converted pyarrow predicate: {}",
175                    ExprIRDisplay::display_node(e.node(), expr_arena),
176                    &predicate_pa.as_deref().unwrap_or("<conversion failed>")
177                )
178            });
179
180            if let Some(eval_str) = predicate_pa {
181                options.predicate = PythonPredicate::PyArrow(eval_str);
182                // We don't have to use a physical expression as pyarrow deals with the filter.
183                None
184            } else {
185                Some(create_physical_expr(e, expr_arena, &options.schema, state)?)
186            }
187        }
188        // Convert to physical expression for the case the reader cannot consume the predicate.
189        else {
190            let dsl_expr = e.to_expr(expr_arena);
191            predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
192
193            Some(create_physical_expr(e, expr_arena, &options.schema, state)?)
194        }
195    } else {
196        None
197    };
198
199    Ok((predicate, predicate_serialized))
200}
201
202#[recursive]
203fn create_physical_plan_impl(
204    root: Node,
205    lp_arena: &mut Arena<IR>,
206    expr_arena: &mut Arena<AExpr>,
207    state: &mut ConversionState,
208    // Cache nodes in order of discovery
209    cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
210    build_streaming_executor: Option<StreamingExecutorBuilder>,
211) -> PolarsResult<Box<dyn Executor>> {
212    use IR::*;
213
214    let get_streaming_executor_builder = || {
215        build_streaming_executor.expect(
216            "get_streaming_executor_builder() failed (hint: missing feature new-streaming?)",
217        )
218    };
219
220    macro_rules! recurse {
221        ($node:expr, $state: expr) => {
222            create_physical_plan_impl(
223                $node,
224                lp_arena,
225                expr_arena,
226                $state,
227                cache_nodes,
228                build_streaming_executor,
229            )
230        };
231    }
232
233    let logical_plan = if state.has_cache_parent
234        || matches!(
235            lp_arena.get(root),
236            IR::Scan { .. } // Needed for the streaming impl
237                | IR::Cache { .. } // Needed for plans branching from the same cache node
238                | IR::GroupBy { .. } // Needed for the streaming impl
239                | IR::Sink { // Needed for the streaming impl
240                    payload:
241                        SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. },
242                    ..
243                }
244        ) {
245        lp_arena.get(root).clone()
246    } else {
247        lp_arena.take(root)
248    };
249
250    match logical_plan {
251        #[cfg(feature = "python")]
252        PythonScan { mut options } => {
253            let mut expr_conv_state = ExpressionConversionState::new(true);
254            let (predicate, predicate_serialized) =
255                python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
256            Ok(Box::new(executors::PythonScanExec {
257                options,
258                predicate,
259                predicate_serialized,
260            }))
261        },
262        Sink { input, payload } => match payload {
263            SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
264                input: recurse!(input, state)?,
265                name: PlSmallStr::from_static("mem"),
266                f: Box::new(move |df, _state| Ok(Some(df))),
267            })),
268            SinkTypeIR::Callback(CallbackSinkType {
269                function,
270                maintain_order: _,
271                chunk_size,
272            }) => {
273                let chunk_size = chunk_size.map_or(usize::MAX, Into::into);
274
275                Ok(Box::new(SinkExecutor {
276                    input: recurse!(input, state)?,
277                    name: PlSmallStr::from_static("batches"),
278                    f: Box::new(move |mut buffer, _state| {
279                        while buffer.height() > 0 {
280                            let df;
281                            (df, buffer) = buffer.split_at(buffer.height().min(chunk_size) as i64);
282                            let should_stop = function.call(df)?;
283                            if should_stop {
284                                break;
285                            }
286                        }
287                        Ok(Some(DataFrame::empty()))
288                    }),
289                }))
290            },
291            SinkTypeIR::File(_) | SinkTypeIR::Partitioned { .. } => {
292                get_streaming_executor_builder()(root, lp_arena, expr_arena)
293            },
294        },
295        SinkMultiple { .. } => {
296            unreachable!("should be handled with create_multiple_physical_plans")
297        },
298        Union { inputs, options } => {
299            let inputs = state.with_new_branch(|new_state| {
300                inputs
301                    .into_iter()
302                    .map(|node| recurse!(node, new_state))
303                    .collect::<PolarsResult<Vec<_>>>()
304            });
305            let inputs = inputs?;
306            Ok(Box::new(executors::UnionExec { inputs, options }))
307        },
308        HConcat {
309            inputs, options, ..
310        } => {
311            let inputs = state.with_new_branch(|new_state| {
312                inputs
313                    .into_iter()
314                    .map(|node| recurse!(node, new_state))
315                    .collect::<PolarsResult<Vec<_>>>()
316            });
317
318            let inputs = inputs?;
319
320            Ok(Box::new(executors::HConcatExec { inputs, options }))
321        },
322        Slice { input, offset, len } => {
323            let input = recurse!(input, state)?;
324            Ok(Box::new(executors::SliceExec { input, offset, len }))
325        },
326        Filter { input, predicate } => {
327            let streamable = is_elementwise_rec(predicate.node(), expr_arena);
328            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
329            let input = recurse!(input, state)?;
330            let mut state = ExpressionConversionState::new(true);
331            let predicate =
332                create_physical_expr(&predicate, expr_arena, &input_schema, &mut state)?;
333            Ok(Box::new(executors::FilterExec::new(
334                predicate,
335                input,
336                state.has_windows,
337                streamable,
338            )))
339        },
340        #[allow(unused_variables)]
341        Scan {
342            sources,
343            file_info,
344            hive_parts,
345            output_schema,
346            scan_type,
347            predicate,
348            predicate_file_skip_applied,
349            unified_scan_args,
350        } => {
351            let mut expr_conversion_state = ExpressionConversionState::new(true);
352
353            let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();
354            #[cfg(feature = "parquet")]
355            {
356                if let FileScanIR::Parquet { options, .. } = scan_type.as_ref() {
357                    create_skip_batch_predicate |= options.use_statistics;
358                }
359            }
360
361            let predicate = predicate
362                .map(|predicate| {
363                    create_scan_predicate(
364                        &predicate,
365                        expr_arena,
366                        output_schema.as_ref().unwrap_or(&file_info.schema),
367                        None, // hive_schema
368                        &mut expr_conversion_state,
369                        create_skip_batch_predicate,
370                        false,
371                    )
372                })
373                .transpose()?;
374
375            match *scan_type {
376                FileScanIR::Anonymous { function, .. } => {
377                    Ok(Box::new(executors::AnonymousScanExec {
378                        function,
379                        predicate,
380                        unified_scan_args,
381                        file_info,
382                        output_schema,
383                        predicate_has_windows: expr_conversion_state.has_windows,
384                    }))
385                },
386                #[cfg_attr(
387                    not(any(
388                        feature = "parquet",
389                        feature = "ipc",
390                        feature = "csv",
391                        feature = "json",
392                        feature = "scan_lines"
393                    )),
394                    expect(unreachable_patterns)
395                )]
396                _ => get_streaming_executor_builder()(root, lp_arena, expr_arena),
397            }
398        },
399
400        Select {
401            expr,
402            input,
403            schema: _schema,
404            options,
405            ..
406        } => {
407            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
408            let input = recurse!(input, state)?;
409            let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
410            let phys_expr =
411                create_physical_expressions_from_irs(&expr, expr_arena, &input_schema, &mut state)?;
412
413            let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
414                // If all columns are literal we would get a 1 row per thread.
415                && !phys_expr.iter().all(|p| {
416                    p.is_literal()
417                });
418
419            Ok(Box::new(executors::ProjectionExec {
420                input,
421                expr: phys_expr,
422                has_windows: state.has_windows,
423                input_schema,
424                #[cfg(test)]
425                schema: _schema,
426                options,
427                allow_vertical_parallelism,
428            }))
429        },
430        DataFrameScan {
431            df, output_schema, ..
432        } => Ok(Box::new(executors::DataFrameExec {
433            df,
434            projection: output_schema.map(|s| s.iter_names_cloned().collect()),
435        })),
436        Sort {
437            input,
438            by_column,
439            slice,
440            sort_options,
441        } => {
442            debug_assert!(!by_column.is_empty());
443            let input_schema = lp_arena.get(input).schema(lp_arena);
444            let by_column = create_physical_expressions_from_irs(
445                &by_column,
446                expr_arena,
447                input_schema.as_ref(),
448                &mut ExpressionConversionState::new(true),
449            )?;
450            let input = recurse!(input, state)?;
451            Ok(Box::new(executors::SortExec {
452                input,
453                by_column,
454                slice,
455                sort_options,
456            }))
457        },
458        Cache { input, id } => {
459            state.has_cache_parent = true;
460            state.has_cache_child = true;
461
462            if let Some(cache) = cache_nodes.get_mut(&id) {
463                Ok(Box::new(cache.make_exec()))
464            } else {
465                let input = recurse!(input, state)?;
466
467                let mut prefill = executors::CachePrefill::new_cache(input, id);
468                let exec = prefill.make_exec();
469
470                cache_nodes.insert(id, prefill);
471
472                Ok(Box::new(exec))
473            }
474        },
475        Distinct { input, options } => {
476            let input = recurse!(input, state)?;
477            Ok(Box::new(executors::UniqueExec { input, options }))
478        },
479        GroupBy {
480            input,
481            keys,
482            aggs,
483            apply,
484            schema: output_schema,
485            maintain_order,
486            options,
487        } => {
488            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
489            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
490            let phys_keys = create_physical_expressions_from_irs(
491                &keys,
492                expr_arena,
493                &input_schema,
494                &mut ExpressionConversionState::new(true),
495            )?;
496            let phys_aggs = create_physical_expressions_from_irs(
497                &aggs,
498                expr_arena,
499                &input_schema,
500                &mut ExpressionConversionState::new(true),
501            )?;
502
503            let _slice = options.slice;
504            #[cfg(feature = "dynamic_group_by")]
505            if let Some(options) = options.dynamic {
506                let input = recurse!(input, state)?;
507                return Ok(Box::new(executors::GroupByDynamicExec {
508                    input,
509                    keys: phys_keys,
510                    aggs: phys_aggs,
511                    options,
512                    input_schema,
513                    output_schema,
514                    slice: _slice,
515                    apply,
516                }));
517            }
518
519            #[cfg(feature = "dynamic_group_by")]
520            if let Some(options) = options.rolling {
521                let input = recurse!(input, state)?;
522                return Ok(Box::new(executors::GroupByRollingExec {
523                    input,
524                    keys: phys_keys,
525                    aggs: phys_aggs,
526                    options,
527                    input_schema,
528                    output_schema,
529                    slice: _slice,
530                    apply,
531                }));
532            }
533
534            // We first check if we can partition the group_by on the latest moment.
535            let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
536            if partitionable {
537                let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
538                    if let Union { options, .. } = lp {
539                        options.from_partitioned_ds
540                    } else {
541                        false
542                    }
543                });
544                let builder = get_streaming_executor_builder();
545
546                let input = recurse!(input, state)?;
547
548                let gb_root = if state.has_cache_parent {
549                    lp_arena.add(lp_arena.get(root).clone())
550                } else {
551                    root
552                };
553
554                let executor = Box::new(GroupByStreamingExec::new(
555                    input,
556                    builder,
557                    gb_root,
558                    lp_arena,
559                    expr_arena,
560                    phys_keys,
561                    phys_aggs,
562                    maintain_order,
563                    output_schema,
564                    _slice,
565                    from_partitioned_ds,
566                ));
567
568                Ok(executor)
569            } else {
570                let input = recurse!(input, state)?;
571                Ok(Box::new(executors::GroupByExec::new(
572                    input,
573                    phys_keys,
574                    phys_aggs,
575                    apply,
576                    maintain_order,
577                    input_schema,
578                    output_schema,
579                    options.slice,
580                )))
581            }
582        },
583        Join {
584            input_left,
585            input_right,
586            left_on,
587            right_on,
588            options,
589            schema,
590            ..
591        } => {
592            let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
593            let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
594
595            let (input_left, input_right) = state.with_new_branch(|new_state| {
596                (
597                    recurse!(input_left, new_state),
598                    recurse!(input_right, new_state),
599                )
600            });
601            let input_left = input_left?;
602            let input_right = input_right?;
603
604            // Todo! remove the force option. It can deadlock.
605            let parallel = if options.force_parallel {
606                true
607            } else {
608                options.allow_parallel
609            };
610
611            let left_on = create_physical_expressions_from_irs(
612                &left_on,
613                expr_arena,
614                &schema_left,
615                &mut ExpressionConversionState::new(true),
616            )?;
617            let right_on = create_physical_expressions_from_irs(
618                &right_on,
619                expr_arena,
620                &schema_right,
621                &mut ExpressionConversionState::new(true),
622            )?;
623            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
624
625            // Convert the join options, to the physical join options. This requires the physical
626            // planner, so we do this last minute.
627            let join_type_options = options
628                .options
629                .map(|o| {
630                    o.compile(|e| {
631                        let phys_expr = create_physical_expr(
632                            e,
633                            expr_arena,
634                            &schema,
635                            &mut ExpressionConversionState::new(false),
636                        )?;
637
638                        let execution_state = ExecutionState::default();
639
640                        Ok(Arc::new(move |df: DataFrame| {
641                            let mask = phys_expr.evaluate(&df, &execution_state)?;
642                            let mask = mask.as_materialized_series();
643                            let mask = mask.bool()?;
644                            df.filter_seq(mask)
645                        }))
646                    })
647                })
648                .transpose()?;
649
650            Ok(Box::new(executors::JoinExec::new(
651                input_left,
652                input_right,
653                left_on,
654                right_on,
655                parallel,
656                options.args,
657                join_type_options,
658            )))
659        },
660        HStack {
661            input,
662            exprs,
663            schema: output_schema,
664            options,
665        } => {
666            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
667            let input = recurse!(input, state)?;
668
669            let allow_vertical_parallelism = options.should_broadcast
670                && exprs
671                    .iter()
672                    .all(|e| is_elementwise_rec(e.node(), expr_arena));
673
674            let mut state =
675                ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
676
677            let phys_exprs = create_physical_expressions_from_irs(
678                &exprs,
679                expr_arena,
680                &input_schema,
681                &mut state,
682            )?;
683            Ok(Box::new(executors::StackExec {
684                input,
685                has_windows: state.has_windows,
686                exprs: phys_exprs,
687                input_schema,
688                output_schema,
689                options,
690                allow_vertical_parallelism,
691            }))
692        },
693        MapFunction {
694            input, function, ..
695        } => {
696            let input = recurse!(input, state)?;
697            Ok(Box::new(executors::UdfExec { input, function }))
698        },
699        ExtContext {
700            input, contexts, ..
701        } => {
702            let input = recurse!(input, state)?;
703            let contexts = contexts
704                .into_iter()
705                .map(|node| recurse!(node, state))
706                .collect::<PolarsResult<_>>()?;
707            Ok(Box::new(executors::ExternalContext { input, contexts }))
708        },
709        SimpleProjection { input, columns } => {
710            let input = recurse!(input, state)?;
711            let exec = executors::ProjectionSimple { input, columns };
712            Ok(Box::new(exec))
713        },
714        #[cfg(feature = "merge_sorted")]
715        MergeSorted {
716            input_left,
717            input_right,
718            key,
719        } => {
720            let (input_left, input_right) = state.with_new_branch(|new_state| {
721                (
722                    recurse!(input_left, new_state),
723                    recurse!(input_right, new_state),
724                )
725            });
726            let input_left = input_left?;
727            let input_right = input_right?;
728
729            let exec = executors::MergeSorted {
730                input_left,
731                input_right,
732                key,
733            };
734            Ok(Box::new(exec))
735        },
736        Invalid => unreachable!(),
737    }
738}
739
740#[cfg(test)]
741mod tests {
742    use super::*;
743
744    #[test]
745    fn test_create_multiple_physical_plans_reused_cache() {
746        // Check that reusing the same cache node doesn't panic.
747        // CSE creates duplicate cache nodes with the same ID, but cloud reuses them.
748
749        let mut ir = Arena::new();
750
751        let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
752        let scan = ir.add(IR::DataFrameScan {
753            df: Arc::new(DataFrame::empty_with_schema(&schema)),
754            schema: Arc::new(schema),
755            output_schema: None,
756        });
757
758        let cache = ir.add(IR::Cache {
759            input: scan,
760            id: UniqueId::new(),
761        });
762
763        let left_sink = ir.add(IR::Sink {
764            input: cache,
765            payload: SinkTypeIR::Memory,
766        });
767        let right_sink = ir.add(IR::Sink {
768            input: cache,
769            payload: SinkTypeIR::Memory,
770        });
771
772        let _multiplan = create_multiple_physical_plans(
773            &[left_sink, right_sink],
774            &mut ir,
775            &mut Arena::new(),
776            None,
777        )
778        .unwrap();
779    }
780}