polars_mem_engine/planner/
lp.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_utils::format_pl_smallstr;
6use polars_utils::unique_id::UniqueId;
7use recursive::recursive;
8
9use self::expr_ir::OutputName;
10use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11#[cfg(feature = "python")]
12use self::python_dsl::PythonScanSource;
13use super::*;
14use crate::ScanPredicate;
15use crate::executors::{
16    self, CachePrefiller, Executor, PartitionedSinkExecutor, SinkExecutor, sink_name,
17};
18use crate::predicate::PhysicalColumnPredicates;
19
20pub type StreamingExecutorBuilder =
21    fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
22
23fn partitionable_gb(
24    keys: &[ExprIR],
25    aggs: &[ExprIR],
26    input_schema: &Schema,
27    expr_arena: &Arena<AExpr>,
28    apply: &Option<Arc<dyn DataFrameUdf>>,
29) -> bool {
30    // checks:
31    //      1. complex expressions in the group_by itself are also not partitionable
32    //          in this case anything more than col("foo")
33    //      2. a custom function cannot be partitioned
34    //      3. we don't bother with more than 2 keys, as the cardinality likely explodes
35    //         by the combinations
36    if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
37        // complex expressions in the group_by itself are also not partitionable
38        // in this case anything more than col("foo")
39        for key in keys {
40            if (expr_arena).iter(key.node()).count() > 1
41                || has_aexpr(key.node(), expr_arena, |ae| match ae {
42                    AExpr::Literal(lv) => !lv.is_scalar(),
43                    _ => false,
44                })
45            {
46                return false;
47            }
48        }
49
50        can_pre_agg_exprs(aggs, expr_arena, input_schema)
51    } else {
52        false
53    }
54}
55
56#[derive(Clone)]
57struct ConversionState {
58    has_cache_child: bool,
59    has_cache_parent: bool,
60}
61
62impl ConversionState {
63    fn new() -> PolarsResult<Self> {
64        Ok(ConversionState {
65            has_cache_child: false,
66            has_cache_parent: false,
67        })
68    }
69
70    fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
71        let mut new_state = self.clone();
72        new_state.has_cache_child = false;
73        let out = func(&mut new_state);
74        self.has_cache_child = new_state.has_cache_child;
75        out
76    }
77}
78
79pub fn create_physical_plan(
80    root: Node,
81    lp_arena: &mut Arena<IR>,
82    expr_arena: &mut Arena<AExpr>,
83    build_streaming_executor: Option<StreamingExecutorBuilder>,
84) -> PolarsResult<Box<dyn Executor>> {
85    let mut state = ConversionState::new()?;
86    let mut cache_nodes = Default::default();
87    let plan = create_physical_plan_impl(
88        root,
89        lp_arena,
90        expr_arena,
91        &mut state,
92        &mut cache_nodes,
93        build_streaming_executor,
94    )?;
95
96    if cache_nodes.is_empty() {
97        Ok(plan)
98    } else {
99        Ok(Box::new(CachePrefiller {
100            caches: cache_nodes,
101            phys_plan: plan,
102        }))
103    }
104}
105
106pub struct MultiplePhysicalPlans {
107    pub cache_prefiller: Option<Box<dyn Executor>>,
108    pub physical_plans: Vec<Box<dyn Executor>>,
109}
110pub fn create_multiple_physical_plans(
111    roots: &[Node],
112    lp_arena: &mut Arena<IR>,
113    expr_arena: &mut Arena<AExpr>,
114    build_streaming_executor: Option<StreamingExecutorBuilder>,
115) -> PolarsResult<MultiplePhysicalPlans> {
116    let mut state = ConversionState::new()?;
117    let mut cache_nodes = Default::default();
118    let plans = state.with_new_branch(|new_state| {
119        roots
120            .iter()
121            .map(|&node| {
122                create_physical_plan_impl(
123                    node,
124                    lp_arena,
125                    expr_arena,
126                    new_state,
127                    &mut cache_nodes,
128                    build_streaming_executor,
129                )
130            })
131            .collect::<PolarsResult<Vec<_>>>()
132    })?;
133
134    let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
135        struct Empty;
136        impl Executor for Empty {
137            fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
138                Ok(DataFrame::empty())
139            }
140        }
141        Box::new(CachePrefiller {
142            caches: cache_nodes,
143            phys_plan: Box::new(Empty),
144        }) as _
145    });
146
147    Ok(MultiplePhysicalPlans {
148        cache_prefiller,
149        physical_plans: plans,
150    })
151}
152
153#[cfg(feature = "python")]
154#[allow(clippy::type_complexity)]
155pub fn python_scan_predicate(
156    options: &mut PythonOptions,
157    expr_arena: &Arena<AExpr>,
158    state: &mut ExpressionConversionState,
159) -> PolarsResult<(
160    Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
161    Option<Vec<u8>>,
162)> {
163    let mut predicate_serialized = None;
164    let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
165        // Convert to a pyarrow eval string.
166        if matches!(options.python_source, PythonScanSource::Pyarrow) {
167            if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
168                e.node(),
169                expr_arena,
170                Default::default(),
171            ) {
172                options.predicate = PythonPredicate::PyArrow(eval_str);
173                // We don't have to use a physical expression as pyarrow deals with the filter.
174                None
175            } else {
176                Some(create_physical_expr(
177                    e,
178                    Context::Default,
179                    expr_arena,
180                    &options.schema,
181                    state,
182                )?)
183            }
184        }
185        // Convert to physical expression for the case the reader cannot consume the predicate.
186        else {
187            let dsl_expr = e.to_expr(expr_arena);
188            predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
189
190            Some(create_physical_expr(
191                e,
192                Context::Default,
193                expr_arena,
194                &options.schema,
195                state,
196            )?)
197        }
198    } else {
199        None
200    };
201
202    Ok((predicate, predicate_serialized))
203}
204
205#[recursive]
206fn create_physical_plan_impl(
207    root: Node,
208    lp_arena: &mut Arena<IR>,
209    expr_arena: &mut Arena<AExpr>,
210    state: &mut ConversionState,
211    // Cache nodes in order of discovery
212    cache_nodes: &mut PlIndexMap<UniqueId, Box<executors::CacheExec>>,
213    build_streaming_executor: Option<StreamingExecutorBuilder>,
214) -> PolarsResult<Box<dyn Executor>> {
215    use IR::*;
216
217    macro_rules! recurse {
218        ($node:expr, $state: expr) => {
219            create_physical_plan_impl(
220                $node,
221                lp_arena,
222                expr_arena,
223                $state,
224                cache_nodes,
225                build_streaming_executor,
226            )
227        };
228    }
229
230    let logical_plan = if state.has_cache_parent
231        || matches!(
232            lp_arena.get(root),
233            IR::Scan { .. } // Needed for the streaming impl
234                | IR::Cache { .. } // Needed for plans branching from the same cache node
235                | IR::Sink { // Needed for the streaming impl
236                    payload: SinkTypeIR::Partition(_),
237                    ..
238                }
239        ) {
240        lp_arena.get(root).clone()
241    } else {
242        lp_arena.take(root)
243    };
244
245    match logical_plan {
246        #[cfg(feature = "python")]
247        PythonScan { mut options } => {
248            let mut expr_conv_state = ExpressionConversionState::new(true);
249            let (predicate, predicate_serialized) =
250                python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
251            Ok(Box::new(executors::PythonScanExec {
252                options,
253                predicate,
254                predicate_serialized,
255            }))
256        },
257        Sink { input, payload } => {
258            let input = recurse!(input, state)?;
259            match payload {
260                SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
261                    input,
262                    name: "mem".to_string(),
263                    f: Box::new(move |df, _state| Ok(Some(df))),
264                })),
265                SinkTypeIR::File(FileSinkType {
266                    file_type,
267                    target,
268                    sink_options,
269                    cloud_options,
270                }) => {
271                    let name = sink_name(&file_type).to_owned();
272                    Ok(Box::new(SinkExecutor {
273                        input,
274                        name,
275                        f: Box::new(move |mut df, _state| {
276                            let mut file = target
277                                .open_into_writeable(&sink_options, cloud_options.as_ref())?;
278                            let writer = &mut *file;
279
280                            use std::io::BufWriter;
281                            match &file_type {
282                                #[cfg(feature = "parquet")]
283                                FileType::Parquet(options) => {
284                                    use polars_io::parquet::write::ParquetWriter;
285                                    ParquetWriter::new(BufWriter::new(writer))
286                                        .with_compression(options.compression)
287                                        .with_statistics(options.statistics)
288                                        .with_row_group_size(options.row_group_size)
289                                        .with_data_page_size(options.data_page_size)
290                                        .with_key_value_metadata(options.key_value_metadata.clone())
291                                        .finish(&mut df)?;
292                                },
293                                #[cfg(feature = "ipc")]
294                                FileType::Ipc(options) => {
295                                    use polars_io::SerWriter;
296                                    use polars_io::ipc::IpcWriter;
297                                    IpcWriter::new(BufWriter::new(writer))
298                                        .with_compression(options.compression)
299                                        .with_compat_level(options.compat_level)
300                                        .finish(&mut df)?;
301                                },
302                                #[cfg(feature = "csv")]
303                                FileType::Csv(options) => {
304                                    use polars_io::SerWriter;
305                                    use polars_io::csv::write::CsvWriter;
306                                    CsvWriter::new(BufWriter::new(writer))
307                                        .include_bom(options.include_bom)
308                                        .include_header(options.include_header)
309                                        .with_separator(options.serialize_options.separator)
310                                        .with_line_terminator(
311                                            options.serialize_options.line_terminator.clone(),
312                                        )
313                                        .with_quote_char(options.serialize_options.quote_char)
314                                        .with_batch_size(options.batch_size)
315                                        .with_datetime_format(
316                                            options.serialize_options.datetime_format.clone(),
317                                        )
318                                        .with_date_format(
319                                            options.serialize_options.date_format.clone(),
320                                        )
321                                        .with_time_format(
322                                            options.serialize_options.time_format.clone(),
323                                        )
324                                        .with_float_scientific(
325                                            options.serialize_options.float_scientific,
326                                        )
327                                        .with_float_precision(
328                                            options.serialize_options.float_precision,
329                                        )
330                                        .with_decimal_comma(options.serialize_options.decimal_comma)
331                                        .with_null_value(options.serialize_options.null.clone())
332                                        .with_quote_style(options.serialize_options.quote_style)
333                                        .finish(&mut df)?;
334                                },
335                                #[cfg(feature = "json")]
336                                FileType::Json(_options) => {
337                                    use polars_io::SerWriter;
338                                    use polars_io::json::{JsonFormat, JsonWriter};
339
340                                    JsonWriter::new(BufWriter::new(writer))
341                                        .with_json_format(JsonFormat::JsonLines)
342                                        .finish(&mut df)?;
343                                },
344                                #[allow(unreachable_patterns)]
345                                _ => panic!("enable filetype feature"),
346                            }
347
348                            file.sync_on_close(sink_options.sync_on_close)?;
349                            file.close()?;
350
351                            Ok(None)
352                        }),
353                    }))
354                },
355                SinkTypeIR::Partition(_) => {
356                    let builder = build_streaming_executor
357                        .expect("invalid build. Missing feature new-streaming");
358
359                    let executor = Box::new(PartitionedSinkExecutor::new(
360                        input, builder, root, lp_arena, expr_arena,
361                    ));
362
363                    let id = UniqueId::new();
364
365                    // Use cache so that this runs during the cache pre-filling stage and not on the
366                    // thread pool, it could deadlock since the streaming engine uses the thread
367                    // pool internally.
368                    let existing = cache_nodes.insert(
369                        id,
370                        Box::new(executors::CacheExec {
371                            input: Some(executor),
372                            id,
373                            count: 0,
374                            is_new_streaming_scan: false,
375                        }),
376                    );
377
378                    assert!(existing.is_none());
379
380                    Ok(Box::new(executors::CacheExec {
381                        id,
382                        // Rest of the fields don't matter - the actual node was inserted into
383                        // `cache_nodes`.
384                        input: None,
385                        count: Default::default(),
386                        is_new_streaming_scan: false,
387                    }))
388                },
389            }
390        },
391        SinkMultiple { .. } => {
392            unreachable!("should be handled with create_multiple_physical_plans")
393        },
394        Union { inputs, options } => {
395            let inputs = state.with_new_branch(|new_state| {
396                inputs
397                    .into_iter()
398                    .map(|node| recurse!(node, new_state))
399                    .collect::<PolarsResult<Vec<_>>>()
400            });
401            let inputs = inputs?;
402            Ok(Box::new(executors::UnionExec { inputs, options }))
403        },
404        HConcat {
405            inputs, options, ..
406        } => {
407            let inputs = state.with_new_branch(|new_state| {
408                inputs
409                    .into_iter()
410                    .map(|node| recurse!(node, new_state))
411                    .collect::<PolarsResult<Vec<_>>>()
412            });
413
414            let inputs = inputs?;
415
416            Ok(Box::new(executors::HConcatExec { inputs, options }))
417        },
418        Slice { input, offset, len } => {
419            let input = recurse!(input, state)?;
420            Ok(Box::new(executors::SliceExec { input, offset, len }))
421        },
422        Filter { input, predicate } => {
423            let streamable = is_elementwise_rec(predicate.node(), expr_arena);
424            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
425            let input = recurse!(input, state)?;
426            let mut state = ExpressionConversionState::new(true);
427            let predicate = create_physical_expr(
428                &predicate,
429                Context::Default,
430                expr_arena,
431                &input_schema,
432                &mut state,
433            )?;
434            Ok(Box::new(executors::FilterExec::new(
435                predicate,
436                input,
437                state.has_windows,
438                streamable,
439            )))
440        },
441        #[allow(unused_variables)]
442        Scan {
443            sources,
444            file_info,
445            hive_parts,
446            output_schema,
447            scan_type,
448            predicate,
449            unified_scan_args,
450        } => {
451            let mut expr_conversion_state = ExpressionConversionState::new(true);
452
453            let mut create_skip_batch_predicate = false;
454            #[cfg(feature = "parquet")]
455            {
456                create_skip_batch_predicate |= matches!(
457                    &*scan_type,
458                    FileScanIR::Parquet {
459                        options: polars_io::prelude::ParquetOptions {
460                            use_statistics: true,
461                            ..
462                        },
463                        ..
464                    }
465                );
466            }
467
468            let predicate = predicate
469                .map(|predicate| {
470                    create_scan_predicate(
471                        &predicate,
472                        expr_arena,
473                        output_schema.as_ref().unwrap_or(&file_info.schema),
474                        None, // hive_schema
475                        &mut expr_conversion_state,
476                        create_skip_batch_predicate,
477                        false,
478                    )
479                })
480                .transpose()?;
481
482            match *scan_type {
483                FileScanIR::Anonymous { function, .. } => {
484                    Ok(Box::new(executors::AnonymousScanExec {
485                        function,
486                        predicate,
487                        unified_scan_args,
488                        file_info,
489                        output_schema,
490                        predicate_has_windows: expr_conversion_state.has_windows,
491                    }))
492                },
493                #[allow(unreachable_patterns)]
494                _ => {
495                    // We wrap in a CacheExec so that the new-streaming scan gets called from the
496                    // CachePrefiller. This ensures it is called from outside of rayon to avoid
497                    // deadlocks.
498                    //
499                    // Note that we don't actually want it to be kept in memory after being used,
500                    // so we set the count to have it be dropped after a single use (or however
501                    // many times it is referenced after CSE (subplan)).
502                    state.has_cache_parent = true;
503                    state.has_cache_child = true;
504
505                    let build_func = build_streaming_executor
506                        .expect("invalid build. Missing feature new-streaming");
507
508                    let executor = build_func(root, lp_arena, expr_arena)?;
509
510                    // Generate a unique ID for this scan. Currently this scan can be visited only
511                    // once, since common subplans are always behind a cache, which prevents
512                    // multiple traversals of the common subplan.
513                    //
514                    // If this property changes in the future, the same scan could be visited
515                    // and executed multiple times. It is a responsibility of the caller to
516                    // insert a cache if multiple executions are not desirable.
517                    let id = UniqueId::new();
518
519                    let existing = cache_nodes.insert(
520                        id,
521                        Box::new(executors::CacheExec {
522                            input: Some(executor),
523                            id,
524                            // This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
525                            count: 0,
526                            is_new_streaming_scan: true,
527                        }),
528                    );
529
530                    assert!(existing.is_none());
531
532                    Ok(Box::new(executors::CacheExec {
533                        id,
534                        // Rest of the fields don't matter - the actual node was inserted into
535                        // `cache_nodes`.
536                        input: None,
537                        count: Default::default(),
538                        is_new_streaming_scan: true,
539                    }))
540                },
541                #[allow(unreachable_patterns)]
542                _ => unreachable!(),
543            }
544        },
545
546        Select {
547            expr,
548            input,
549            schema: _schema,
550            options,
551            ..
552        } => {
553            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
554            let input = recurse!(input, state)?;
555            let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
556            let phys_expr = create_physical_expressions_from_irs(
557                &expr,
558                Context::Default,
559                expr_arena,
560                &input_schema,
561                &mut state,
562            )?;
563
564            let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
565                // If all columns are literal we would get a 1 row per thread.
566                && !phys_expr.iter().all(|p| {
567                    p.is_literal()
568                });
569
570            Ok(Box::new(executors::ProjectionExec {
571                input,
572                expr: phys_expr,
573                has_windows: state.has_windows,
574                input_schema,
575                #[cfg(test)]
576                schema: _schema,
577                options,
578                allow_vertical_parallelism,
579            }))
580        },
581        DataFrameScan {
582            df, output_schema, ..
583        } => Ok(Box::new(executors::DataFrameExec {
584            df,
585            projection: output_schema.map(|s| s.iter_names_cloned().collect()),
586        })),
587        Sort {
588            input,
589            by_column,
590            slice,
591            sort_options,
592        } => {
593            debug_assert!(!by_column.is_empty());
594            let input_schema = lp_arena.get(input).schema(lp_arena);
595            let by_column = create_physical_expressions_from_irs(
596                &by_column,
597                Context::Default,
598                expr_arena,
599                input_schema.as_ref(),
600                &mut ExpressionConversionState::new(true),
601            )?;
602            let input = recurse!(input, state)?;
603            Ok(Box::new(executors::SortExec {
604                input,
605                by_column,
606                slice,
607                sort_options,
608            }))
609        },
610        Cache {
611            input,
612            id,
613            cache_hits,
614        } => {
615            state.has_cache_parent = true;
616            state.has_cache_child = true;
617
618            if !cache_nodes.contains_key(&id) {
619                let input = recurse!(input, state)?;
620
621                let cache = Box::new(executors::CacheExec {
622                    id,
623                    input: Some(input),
624                    count: cache_hits,
625                    is_new_streaming_scan: false,
626                });
627
628                cache_nodes.insert(id, cache);
629            }
630
631            Ok(Box::new(executors::CacheExec {
632                id,
633                input: None,
634                count: cache_hits,
635                is_new_streaming_scan: false,
636            }))
637        },
638        Distinct { input, options } => {
639            let input = recurse!(input, state)?;
640            Ok(Box::new(executors::UniqueExec { input, options }))
641        },
642        GroupBy {
643            input,
644            keys,
645            aggs,
646            apply,
647            schema,
648            maintain_order,
649            options,
650        } => {
651            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
652            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
653            let phys_keys = create_physical_expressions_from_irs(
654                &keys,
655                Context::Default,
656                expr_arena,
657                &input_schema,
658                &mut ExpressionConversionState::new(true),
659            )?;
660            let phys_aggs = create_physical_expressions_from_irs(
661                &aggs,
662                Context::Aggregation,
663                expr_arena,
664                &input_schema,
665                &mut ExpressionConversionState::new(true),
666            )?;
667
668            let _slice = options.slice;
669            #[cfg(feature = "dynamic_group_by")]
670            if let Some(options) = options.dynamic {
671                let input = recurse!(input, state)?;
672                return Ok(Box::new(executors::GroupByDynamicExec {
673                    input,
674                    keys: phys_keys,
675                    aggs: phys_aggs,
676                    options,
677                    input_schema,
678                    slice: _slice,
679                    apply,
680                }));
681            }
682
683            #[cfg(feature = "dynamic_group_by")]
684            if let Some(options) = options.rolling {
685                let input = recurse!(input, state)?;
686                return Ok(Box::new(executors::GroupByRollingExec {
687                    input,
688                    keys: phys_keys,
689                    aggs: phys_aggs,
690                    options,
691                    input_schema,
692                    slice: _slice,
693                    apply,
694                }));
695            }
696
697            // We first check if we can partition the group_by on the latest moment.
698            let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
699            if partitionable {
700                let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
701                    if let Union { options, .. } = lp {
702                        options.from_partitioned_ds
703                    } else {
704                        false
705                    }
706                });
707                let input = recurse!(input, state)?;
708                let keys = keys
709                    .iter()
710                    .map(|e| e.to_expr(expr_arena))
711                    .collect::<Vec<_>>();
712                let aggs = aggs
713                    .iter()
714                    .map(|e| e.to_expr(expr_arena))
715                    .collect::<Vec<_>>();
716                Ok(Box::new(executors::PartitionGroupByExec::new(
717                    input,
718                    phys_keys,
719                    phys_aggs,
720                    maintain_order,
721                    options.slice,
722                    input_schema,
723                    schema,
724                    from_partitioned_ds,
725                    keys,
726                    aggs,
727                )))
728            } else {
729                let input = recurse!(input, state)?;
730                Ok(Box::new(executors::GroupByExec::new(
731                    input,
732                    phys_keys,
733                    phys_aggs,
734                    apply,
735                    maintain_order,
736                    input_schema,
737                    options.slice,
738                )))
739            }
740        },
741        Join {
742            input_left,
743            input_right,
744            left_on,
745            right_on,
746            options,
747            schema,
748            ..
749        } => {
750            let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
751            let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
752
753            let (input_left, input_right) = state.with_new_branch(|new_state| {
754                (
755                    recurse!(input_left, new_state),
756                    recurse!(input_right, new_state),
757                )
758            });
759            let input_left = input_left?;
760            let input_right = input_right?;
761
762            // Todo! remove the force option. It can deadlock.
763            let parallel = if options.force_parallel {
764                true
765            } else {
766                options.allow_parallel
767            };
768
769            let left_on = create_physical_expressions_from_irs(
770                &left_on,
771                Context::Default,
772                expr_arena,
773                &schema_left,
774                &mut ExpressionConversionState::new(true),
775            )?;
776            let right_on = create_physical_expressions_from_irs(
777                &right_on,
778                Context::Default,
779                expr_arena,
780                &schema_right,
781                &mut ExpressionConversionState::new(true),
782            )?;
783            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
784
785            // Convert the join options, to the physical join options. This requires the physical
786            // planner, so we do this last minute.
787            let join_type_options = options
788                .options
789                .map(|o| {
790                    o.compile(|e| {
791                        let phys_expr = create_physical_expr(
792                            e,
793                            Context::Default,
794                            expr_arena,
795                            &schema,
796                            &mut ExpressionConversionState::new(false),
797                        )?;
798
799                        let execution_state = ExecutionState::default();
800
801                        Ok(Arc::new(move |df: DataFrame| {
802                            let mask = phys_expr.evaluate(&df, &execution_state)?;
803                            let mask = mask.as_materialized_series();
804                            let mask = mask.bool()?;
805                            df._filter_seq(mask)
806                        }))
807                    })
808                })
809                .transpose()?;
810
811            Ok(Box::new(executors::JoinExec::new(
812                input_left,
813                input_right,
814                left_on,
815                right_on,
816                parallel,
817                options.args,
818                join_type_options,
819            )))
820        },
821        HStack {
822            input,
823            exprs,
824            schema: output_schema,
825            options,
826        } => {
827            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
828            let input = recurse!(input, state)?;
829
830            let allow_vertical_parallelism = options.should_broadcast
831                && exprs
832                    .iter()
833                    .all(|e| is_elementwise_rec(e.node(), expr_arena));
834
835            let mut state =
836                ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
837
838            let phys_exprs = create_physical_expressions_from_irs(
839                &exprs,
840                Context::Default,
841                expr_arena,
842                &input_schema,
843                &mut state,
844            )?;
845            Ok(Box::new(executors::StackExec {
846                input,
847                has_windows: state.has_windows,
848                exprs: phys_exprs,
849                input_schema,
850                output_schema,
851                options,
852                allow_vertical_parallelism,
853            }))
854        },
855        MapFunction {
856            input, function, ..
857        } => {
858            let input = recurse!(input, state)?;
859            Ok(Box::new(executors::UdfExec { input, function }))
860        },
861        ExtContext {
862            input, contexts, ..
863        } => {
864            let input = recurse!(input, state)?;
865            let contexts = contexts
866                .into_iter()
867                .map(|node| recurse!(node, state))
868                .collect::<PolarsResult<_>>()?;
869            Ok(Box::new(executors::ExternalContext { input, contexts }))
870        },
871        SimpleProjection { input, columns } => {
872            let input = recurse!(input, state)?;
873            let exec = executors::ProjectionSimple { input, columns };
874            Ok(Box::new(exec))
875        },
876        #[cfg(feature = "merge_sorted")]
877        MergeSorted {
878            input_left,
879            input_right,
880            key,
881        } => {
882            let (input_left, input_right) = state.with_new_branch(|new_state| {
883                (
884                    recurse!(input_left, new_state),
885                    recurse!(input_right, new_state),
886                )
887            });
888            let input_left = input_left?;
889            let input_right = input_right?;
890
891            let exec = executors::MergeSorted {
892                input_left,
893                input_right,
894                key,
895            };
896            Ok(Box::new(exec))
897        },
898        Invalid => unreachable!(),
899    }
900}
901
902pub fn create_scan_predicate(
903    predicate: &ExprIR,
904    expr_arena: &mut Arena<AExpr>,
905    schema: &Arc<Schema>,
906    hive_schema: Option<&Schema>,
907    state: &mut ExpressionConversionState,
908    create_skip_batch_predicate: bool,
909    create_column_predicates: bool,
910) -> PolarsResult<ScanPredicate> {
911    let mut predicate = predicate.clone();
912
913    let mut hive_predicate = None;
914    let mut hive_predicate_is_full_predicate = false;
915
916    #[expect(clippy::never_loop)]
917    loop {
918        let Some(hive_schema) = hive_schema else {
919            break;
920        };
921
922        let mut hive_predicate_parts = vec![];
923        let mut non_hive_predicate_parts = vec![];
924
925        for predicate_part in MintermIter::new(predicate.node(), expr_arena) {
926            if aexpr_to_leaf_names_iter(predicate_part, expr_arena)
927                .all(|name| hive_schema.contains(&name))
928            {
929                hive_predicate_parts.push(predicate_part)
930            } else {
931                non_hive_predicate_parts.push(predicate_part)
932            }
933        }
934
935        if hive_predicate_parts.is_empty() {
936            break;
937        }
938
939        if non_hive_predicate_parts.is_empty() {
940            hive_predicate_is_full_predicate = true;
941            break;
942        }
943
944        {
945            let mut iter = hive_predicate_parts.into_iter();
946            let mut node = iter.next().unwrap();
947
948            for next_node in iter {
949                node = expr_arena.add(AExpr::BinaryExpr {
950                    left: node,
951                    op: Operator::And,
952                    right: next_node,
953                });
954            }
955
956            hive_predicate = Some(create_physical_expr(
957                &ExprIR::from_node(node, expr_arena),
958                Context::Default,
959                expr_arena,
960                schema,
961                state,
962            )?)
963        }
964
965        {
966            let mut iter = non_hive_predicate_parts.into_iter();
967            let mut node = iter.next().unwrap();
968
969            for next_node in iter {
970                node = expr_arena.add(AExpr::BinaryExpr {
971                    left: node,
972                    op: Operator::And,
973                    right: next_node,
974                });
975            }
976
977            predicate = ExprIR::from_node(node, expr_arena);
978        }
979
980        break;
981    }
982
983    let phys_predicate =
984        create_physical_expr(&predicate, Context::Default, expr_arena, schema, state)?;
985
986    if hive_predicate_is_full_predicate {
987        hive_predicate = Some(phys_predicate.clone());
988    }
989
990    let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
991        predicate.node(),
992        expr_arena,
993    )));
994
995    let mut skip_batch_predicate = None;
996
997    if create_skip_batch_predicate {
998        if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
999            let expr = ExprIR::new(node, predicate.output_name_inner().clone());
1000
1001            if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
1002                eprintln!("predicate: {}", predicate.display(expr_arena));
1003                eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
1004            }
1005
1006            let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
1007
1008            skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
1009            for (col, dtype) in schema.iter() {
1010                if !live_columns.contains(col) {
1011                    continue;
1012                }
1013
1014                skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
1015                skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
1016                skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
1017            }
1018
1019            skip_batch_predicate = Some(create_physical_expr(
1020                &expr,
1021                Context::Default,
1022                expr_arena,
1023                &Arc::new(skip_batch_schema),
1024                state,
1025            )?);
1026        }
1027    }
1028
1029    let column_predicates = if create_column_predicates {
1030        let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
1031        if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
1032            eprintln!("column_predicates: {{");
1033            eprintln!("  [");
1034            for (pred, spec) in column_predicates.predicates.values() {
1035                eprintln!(
1036                    "    {} ({spec:?}),",
1037                    ExprIRDisplay::display_node(*pred, expr_arena)
1038                );
1039            }
1040            eprintln!("  ],");
1041            eprintln!(
1042                "  is_sumwise_complete: {}",
1043                column_predicates.is_sumwise_complete
1044            );
1045            eprintln!("}}");
1046        }
1047        PhysicalColumnPredicates {
1048            predicates: column_predicates
1049                .predicates
1050                .into_iter()
1051                .map(|(n, (p, s))| {
1052                    PolarsResult::Ok((
1053                        n,
1054                        (
1055                            create_physical_expr(
1056                                &ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
1057                                Context::Default,
1058                                expr_arena,
1059                                schema,
1060                                state,
1061                            )?,
1062                            s,
1063                        ),
1064                    ))
1065                })
1066                .collect::<PolarsResult<PlHashMap<_, _>>>()?,
1067            is_sumwise_complete: column_predicates.is_sumwise_complete,
1068        }
1069    } else {
1070        PhysicalColumnPredicates {
1071            predicates: PlHashMap::default(),
1072            is_sumwise_complete: false,
1073        }
1074    };
1075
1076    PolarsResult::Ok(ScanPredicate {
1077        predicate: phys_predicate,
1078        live_columns,
1079        skip_batch_predicate,
1080        column_predicates,
1081        hive_predicate,
1082        hive_predicate_is_full_predicate,
1083    })
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089
1090    #[test]
1091    fn test_create_multiple_physical_plans_reused_cache() {
1092        // Check that reusing the same cache node doesn't panic.
1093        // CSE creates duplicate cache nodes with the same ID, but cloud reuses them.
1094
1095        let mut ir = Arena::new();
1096
1097        let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
1098        let scan = ir.add(IR::DataFrameScan {
1099            df: Arc::new(DataFrame::empty_with_schema(&schema)),
1100            schema: Arc::new(schema),
1101            output_schema: None,
1102        });
1103
1104        let cache = ir.add(IR::Cache {
1105            input: scan,
1106            id: UniqueId::new(),
1107            cache_hits: 1,
1108        });
1109
1110        let left_sink = ir.add(IR::Sink {
1111            input: cache,
1112            payload: SinkTypeIR::Memory,
1113        });
1114        let right_sink = ir.add(IR::Sink {
1115            input: cache,
1116            payload: SinkTypeIR::Memory,
1117        });
1118
1119        let _multiplan = create_multiple_physical_plans(
1120            &[left_sink, right_sink],
1121            &mut ir,
1122            &mut Arena::new(),
1123            None,
1124        )
1125        .unwrap();
1126    }
1127}