polars_mem_engine/planner/
lp.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::global::_set_n_rows_for_scan;
5use polars_plan::plans::expr_ir::ExprIR;
6use polars_utils::format_pl_smallstr;
7use recursive::recursive;
8
9use self::expr_ir::OutputName;
10use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11#[cfg(feature = "python")]
12use self::python_dsl::PythonScanSource;
13use super::super::executors::{self, Executor};
14use super::*;
15use crate::ScanPredicate;
16use crate::executors::{CachePrefiller, SinkExecutor};
17use crate::predicate::PhysicalColumnPredicates;
18
19pub type StreamingExecutorBuilder =
20    fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
21
22fn partitionable_gb(
23    keys: &[ExprIR],
24    aggs: &[ExprIR],
25    input_schema: &Schema,
26    expr_arena: &Arena<AExpr>,
27    apply: &Option<Arc<dyn DataFrameUdf>>,
28) -> bool {
29    // checks:
30    //      1. complex expressions in the group_by itself are also not partitionable
31    //          in this case anything more than col("foo")
32    //      2. a custom function cannot be partitioned
33    //      3. we don't bother with more than 2 keys, as the cardinality likely explodes
34    //         by the combinations
35    if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
36        // complex expressions in the group_by itself are also not partitionable
37        // in this case anything more than col("foo")
38        for key in keys {
39            if (expr_arena).iter(key.node()).count() > 1
40                || has_aexpr(key.node(), expr_arena, |ae| match ae {
41                    AExpr::Literal(lv) => !lv.is_scalar(),
42                    _ => false,
43                })
44            {
45                return false;
46            }
47        }
48
49        can_pre_agg_exprs(aggs, expr_arena, input_schema)
50    } else {
51        false
52    }
53}
54
55#[derive(Clone)]
56struct ConversionState {
57    has_cache_child: bool,
58    has_cache_parent: bool,
59}
60
61impl ConversionState {
62    fn new() -> PolarsResult<Self> {
63        Ok(ConversionState {
64            has_cache_child: false,
65            has_cache_parent: false,
66        })
67    }
68
69    fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
70        let mut new_state = self.clone();
71        new_state.has_cache_child = false;
72        let out = func(&mut new_state);
73        self.has_cache_child = new_state.has_cache_child;
74        out
75    }
76}
77
78pub fn create_physical_plan(
79    root: Node,
80    lp_arena: &mut Arena<IR>,
81    expr_arena: &mut Arena<AExpr>,
82    build_streaming_executor: Option<StreamingExecutorBuilder>,
83) -> PolarsResult<Box<dyn Executor>> {
84    let mut state = ConversionState::new()?;
85    let mut cache_nodes = Default::default();
86    let plan = create_physical_plan_impl(
87        root,
88        lp_arena,
89        expr_arena,
90        &mut state,
91        &mut cache_nodes,
92        build_streaming_executor,
93    )?;
94
95    if cache_nodes.is_empty() {
96        Ok(plan)
97    } else {
98        Ok(Box::new(CachePrefiller {
99            caches: cache_nodes,
100            phys_plan: plan,
101        }))
102    }
103}
104
105pub struct MultiplePhysicalPlans {
106    pub cache_prefiller: Option<Box<dyn Executor>>,
107    pub physical_plans: Vec<Box<dyn Executor>>,
108}
109pub fn create_multiple_physical_plans(
110    roots: &[Node],
111    lp_arena: &mut Arena<IR>,
112    expr_arena: &mut Arena<AExpr>,
113    build_streaming_executor: Option<StreamingExecutorBuilder>,
114) -> PolarsResult<MultiplePhysicalPlans> {
115    let mut state = ConversionState::new()?;
116    let mut cache_nodes = Default::default();
117    let plans = state.with_new_branch(|new_state| {
118        roots
119            .iter()
120            .map(|&node| {
121                create_physical_plan_impl(
122                    node,
123                    lp_arena,
124                    expr_arena,
125                    new_state,
126                    &mut cache_nodes,
127                    build_streaming_executor,
128                )
129            })
130            .collect::<PolarsResult<Vec<_>>>()
131    })?;
132
133    let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
134        struct Empty;
135        impl Executor for Empty {
136            fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
137                Ok(DataFrame::empty())
138            }
139        }
140        Box::new(CachePrefiller {
141            caches: cache_nodes,
142            phys_plan: Box::new(Empty),
143        }) as _
144    });
145
146    Ok(MultiplePhysicalPlans {
147        cache_prefiller,
148        physical_plans: plans,
149    })
150}
151
152#[cfg(feature = "python")]
153#[allow(clippy::type_complexity)]
154pub fn python_scan_predicate(
155    options: &mut PythonOptions,
156    expr_arena: &Arena<AExpr>,
157    state: &mut ExpressionConversionState,
158) -> PolarsResult<(
159    Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
160    Option<Vec<u8>>,
161)> {
162    let mut predicate_serialized = None;
163    let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
164        // Convert to a pyarrow eval string.
165        if matches!(options.python_source, PythonScanSource::Pyarrow) {
166            if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
167                e.node(),
168                expr_arena,
169                Default::default(),
170            ) {
171                options.predicate = PythonPredicate::PyArrow(eval_str);
172                // We don't have to use a physical expression as pyarrow deals with the filter.
173                None
174            } else {
175                Some(create_physical_expr(
176                    e,
177                    Context::Default,
178                    expr_arena,
179                    &options.schema,
180                    state,
181                )?)
182            }
183        }
184        // Convert to physical expression for the case the reader cannot consume the predicate.
185        else {
186            let dsl_expr = e.to_expr(expr_arena);
187            predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
188
189            Some(create_physical_expr(
190                e,
191                Context::Default,
192                expr_arena,
193                &options.schema,
194                state,
195            )?)
196        }
197    } else {
198        None
199    };
200
201    Ok((predicate, predicate_serialized))
202}
203
204#[recursive]
205fn create_physical_plan_impl(
206    root: Node,
207    lp_arena: &mut Arena<IR>,
208    expr_arena: &mut Arena<AExpr>,
209    state: &mut ConversionState,
210    // Cache nodes in order of discovery
211    cache_nodes: &mut PlIndexMap<usize, Box<dyn Executor>>,
212    build_streaming_executor: Option<StreamingExecutorBuilder>,
213) -> PolarsResult<Box<dyn Executor>> {
214    use IR::*;
215
216    macro_rules! recurse {
217        ($node:expr, $state: expr) => {
218            create_physical_plan_impl(
219                $node,
220                lp_arena,
221                expr_arena,
222                $state,
223                cache_nodes,
224                build_streaming_executor,
225            )
226        };
227    }
228
229    let logical_plan = if state.has_cache_parent || matches!(lp_arena.get(root), IR::Scan { .. }) {
230        lp_arena.get(root).clone()
231    } else {
232        lp_arena.take(root)
233    };
234
235    match logical_plan {
236        #[cfg(feature = "python")]
237        PythonScan { mut options } => {
238            let mut expr_conv_state = ExpressionConversionState::new(true);
239            let (predicate, predicate_serialized) =
240                python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
241            Ok(Box::new(executors::PythonScanExec {
242                options,
243                predicate,
244                predicate_serialized,
245            }))
246        },
247        Sink { input, payload } => {
248            let input = recurse!(input, state)?;
249            match payload {
250                SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
251                    input,
252                    name: "mem".to_string(),
253                    f: Box::new(move |df, _state| Ok(Some(df))),
254                })),
255                SinkTypeIR::File(FileSinkType {
256                    file_type,
257                    target,
258                    sink_options,
259                    cloud_options,
260                }) => {
261                    let name: &'static str = match &file_type {
262                        #[cfg(feature = "parquet")]
263                        FileType::Parquet(_) => "parquet",
264                        #[cfg(feature = "ipc")]
265                        FileType::Ipc(_) => "ipc",
266                        #[cfg(feature = "csv")]
267                        FileType::Csv(_) => "csv",
268                        #[cfg(feature = "json")]
269                        FileType::Json(_) => "json",
270                        #[allow(unreachable_patterns)]
271                        _ => panic!("enable filetype feature"),
272                    };
273
274                    Ok(Box::new(SinkExecutor {
275                        input,
276                        name: name.to_string(),
277                        f: Box::new(move |mut df, _state| {
278                            let mut file = target
279                                .open_into_writeable(&sink_options, cloud_options.as_ref())?;
280                            let writer = &mut *file;
281
282                            use std::io::BufWriter;
283                            match &file_type {
284                                #[cfg(feature = "parquet")]
285                                FileType::Parquet(options) => {
286                                    use polars_io::parquet::write::ParquetWriter;
287                                    ParquetWriter::new(BufWriter::new(writer))
288                                        .with_compression(options.compression)
289                                        .with_statistics(options.statistics)
290                                        .with_row_group_size(options.row_group_size)
291                                        .with_data_page_size(options.data_page_size)
292                                        .finish(&mut df)?;
293                                },
294                                #[cfg(feature = "ipc")]
295                                FileType::Ipc(options) => {
296                                    use polars_io::SerWriter;
297                                    use polars_io::ipc::IpcWriter;
298                                    IpcWriter::new(BufWriter::new(writer))
299                                        .with_compression(options.compression)
300                                        .with_compat_level(options.compat_level)
301                                        .finish(&mut df)?;
302                                },
303                                #[cfg(feature = "csv")]
304                                FileType::Csv(options) => {
305                                    use polars_io::SerWriter;
306                                    use polars_io::csv::write::CsvWriter;
307                                    CsvWriter::new(BufWriter::new(writer))
308                                        .include_bom(options.include_bom)
309                                        .include_header(options.include_header)
310                                        .with_separator(options.serialize_options.separator)
311                                        .with_line_terminator(
312                                            options.serialize_options.line_terminator.clone(),
313                                        )
314                                        .with_quote_char(options.serialize_options.quote_char)
315                                        .with_batch_size(options.batch_size)
316                                        .with_datetime_format(
317                                            options.serialize_options.datetime_format.clone(),
318                                        )
319                                        .with_date_format(
320                                            options.serialize_options.date_format.clone(),
321                                        )
322                                        .with_time_format(
323                                            options.serialize_options.time_format.clone(),
324                                        )
325                                        .with_float_scientific(
326                                            options.serialize_options.float_scientific,
327                                        )
328                                        .with_float_precision(
329                                            options.serialize_options.float_precision,
330                                        )
331                                        .with_null_value(options.serialize_options.null.clone())
332                                        .with_quote_style(options.serialize_options.quote_style)
333                                        .finish(&mut df)?;
334                                },
335                                #[cfg(feature = "json")]
336                                FileType::Json(_options) => {
337                                    use polars_io::SerWriter;
338                                    use polars_io::json::{JsonFormat, JsonWriter};
339
340                                    JsonWriter::new(BufWriter::new(writer))
341                                        .with_json_format(JsonFormat::JsonLines)
342                                        .finish(&mut df)?;
343                                },
344                                #[allow(unreachable_patterns)]
345                                _ => panic!("enable filetype feature"),
346                            }
347
348                            file.sync_on_close(sink_options.sync_on_close)?;
349                            file.close()?;
350
351                            Ok(None)
352                        }),
353                    }))
354                },
355
356                SinkTypeIR::Partition { .. } => {
357                    polars_bail!(InvalidOperation:
358                        "partition sinks not yet supported in standard engine."
359                    )
360                },
361            }
362        },
363        SinkMultiple { .. } => {
364            unreachable!("should be handled with create_multiple_physical_plans")
365        },
366        Union { inputs, options } => {
367            let inputs = state.with_new_branch(|new_state| {
368                inputs
369                    .into_iter()
370                    .map(|node| recurse!(node, new_state))
371                    .collect::<PolarsResult<Vec<_>>>()
372            });
373            let inputs = inputs?;
374            Ok(Box::new(executors::UnionExec { inputs, options }))
375        },
376        HConcat {
377            inputs, options, ..
378        } => {
379            let inputs = state.with_new_branch(|new_state| {
380                inputs
381                    .into_iter()
382                    .map(|node| recurse!(node, new_state))
383                    .collect::<PolarsResult<Vec<_>>>()
384            });
385
386            let inputs = inputs?;
387
388            Ok(Box::new(executors::HConcatExec { inputs, options }))
389        },
390        Slice { input, offset, len } => {
391            let input = recurse!(input, state)?;
392            Ok(Box::new(executors::SliceExec { input, offset, len }))
393        },
394        Filter { input, predicate } => {
395            let mut streamable =
396                is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
397            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
398            if streamable {
399                // This can cause problems with string caches
400                streamable = !input_schema
401                    .iter_values()
402                    .any(|dt| dt.contains_categoricals())
403                    || {
404                        #[cfg(feature = "dtype-categorical")]
405                        {
406                            polars_core::using_string_cache()
407                        }
408
409                        #[cfg(not(feature = "dtype-categorical"))]
410                        {
411                            false
412                        }
413                    }
414            }
415            let input = recurse!(input, state)?;
416            let mut state = ExpressionConversionState::new(true);
417            let predicate = create_physical_expr(
418                &predicate,
419                Context::Default,
420                expr_arena,
421                &input_schema,
422                &mut state,
423            )?;
424            Ok(Box::new(executors::FilterExec::new(
425                predicate,
426                input,
427                state.has_windows,
428                streamable,
429            )))
430        },
431        #[allow(unused_variables)]
432        Scan {
433            sources,
434            file_info,
435            hive_parts,
436            output_schema,
437            scan_type,
438            predicate,
439            mut unified_scan_args,
440        } => {
441            unified_scan_args.pre_slice = if let Some(mut slice) = unified_scan_args.pre_slice {
442                *slice.len_mut() = _set_n_rows_for_scan(Some(slice.len())).unwrap();
443                Some(slice)
444            } else {
445                _set_n_rows_for_scan(None)
446                    .map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len })
447            };
448
449            let mut state = ExpressionConversionState::new(true);
450
451            let mut create_skip_batch_predicate = false;
452            #[cfg(feature = "parquet")]
453            {
454                create_skip_batch_predicate |= matches!(
455                    &*scan_type,
456                    FileScan::Parquet {
457                        options: polars_io::prelude::ParquetOptions {
458                            use_statistics: true,
459                            ..
460                        },
461                        ..
462                    }
463                );
464            }
465
466            let predicate = predicate
467                .map(|predicate| {
468                    create_scan_predicate(
469                        &predicate,
470                        expr_arena,
471                        output_schema.as_ref().unwrap_or(&file_info.schema),
472                        &mut state,
473                        create_skip_batch_predicate,
474                        false,
475                    )
476                })
477                .transpose()?;
478
479            match *scan_type {
480                FileScan::Anonymous { function, .. } => {
481                    Ok(Box::new(executors::AnonymousScanExec {
482                        function,
483                        predicate,
484                        unified_scan_args,
485                        file_info,
486                        output_schema,
487                        predicate_has_windows: state.has_windows,
488                    }))
489                },
490                #[allow(unreachable_patterns)]
491                _ => {
492                    let build_func = build_streaming_executor
493                        .expect("invalid build. Missing feature new-streaming");
494                    return build_func(root, lp_arena, expr_arena);
495                },
496                #[allow(unreachable_patterns)]
497                _ => unreachable!(),
498            }
499        },
500
501        Select {
502            expr,
503            input,
504            schema: _schema,
505            options,
506            ..
507        } => {
508            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
509            let input = recurse!(input, state)?;
510            let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
511            let phys_expr = create_physical_expressions_from_irs(
512                &expr,
513                Context::Default,
514                expr_arena,
515                &input_schema,
516                &mut state,
517            )?;
518
519            let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
520                // If all columns are literal we would get a 1 row per thread.
521                && !phys_expr.iter().all(|p| {
522                    p.is_literal()
523                });
524
525            Ok(Box::new(executors::ProjectionExec {
526                input,
527                expr: phys_expr,
528                has_windows: state.has_windows,
529                input_schema,
530                #[cfg(test)]
531                schema: _schema,
532                options,
533                allow_vertical_parallelism,
534            }))
535        },
536        DataFrameScan {
537            df, output_schema, ..
538        } => Ok(Box::new(executors::DataFrameExec {
539            df,
540            projection: output_schema.map(|s| s.iter_names_cloned().collect()),
541        })),
542        Sort {
543            input,
544            by_column,
545            slice,
546            sort_options,
547        } => {
548            let input_schema = lp_arena.get(input).schema(lp_arena);
549            let by_column = create_physical_expressions_from_irs(
550                &by_column,
551                Context::Default,
552                expr_arena,
553                input_schema.as_ref(),
554                &mut ExpressionConversionState::new(true),
555            )?;
556            let input = recurse!(input, state)?;
557            Ok(Box::new(executors::SortExec {
558                input,
559                by_column,
560                slice,
561                sort_options,
562            }))
563        },
564        Cache {
565            input,
566            id,
567            cache_hits,
568        } => {
569            state.has_cache_parent = true;
570            state.has_cache_child = true;
571
572            if !cache_nodes.contains_key(&id) {
573                let input = recurse!(input, state)?;
574
575                let cache = Box::new(executors::CacheExec {
576                    id,
577                    input: Some(input),
578                    count: cache_hits,
579                });
580
581                cache_nodes.insert(id, cache);
582            }
583
584            Ok(Box::new(executors::CacheExec {
585                id,
586                input: None,
587                count: cache_hits,
588            }))
589        },
590        Distinct { input, options } => {
591            let input = recurse!(input, state)?;
592            Ok(Box::new(executors::UniqueExec { input, options }))
593        },
594        GroupBy {
595            input,
596            keys,
597            aggs,
598            apply,
599            schema,
600            maintain_order,
601            options,
602        } => {
603            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
604            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
605            let phys_keys = create_physical_expressions_from_irs(
606                &keys,
607                Context::Default,
608                expr_arena,
609                &input_schema,
610                &mut ExpressionConversionState::new(true),
611            )?;
612            let phys_aggs = create_physical_expressions_from_irs(
613                &aggs,
614                Context::Aggregation,
615                expr_arena,
616                &input_schema,
617                &mut ExpressionConversionState::new(true),
618            )?;
619
620            let _slice = options.slice;
621            #[cfg(feature = "dynamic_group_by")]
622            if let Some(options) = options.dynamic {
623                let input = recurse!(input, state)?;
624                return Ok(Box::new(executors::GroupByDynamicExec {
625                    input,
626                    keys: phys_keys,
627                    aggs: phys_aggs,
628                    options,
629                    input_schema,
630                    slice: _slice,
631                    apply,
632                }));
633            }
634
635            #[cfg(feature = "dynamic_group_by")]
636            if let Some(options) = options.rolling {
637                let input = recurse!(input, state)?;
638                return Ok(Box::new(executors::GroupByRollingExec {
639                    input,
640                    keys: phys_keys,
641                    aggs: phys_aggs,
642                    options,
643                    input_schema,
644                    slice: _slice,
645                    apply,
646                }));
647            }
648
649            // We first check if we can partition the group_by on the latest moment.
650            let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
651            if partitionable {
652                let from_partitioned_ds = (&*lp_arena).iter(input).any(|(_, lp)| {
653                    if let Union { options, .. } = lp {
654                        options.from_partitioned_ds
655                    } else {
656                        false
657                    }
658                });
659                let input = recurse!(input, state)?;
660                let keys = keys
661                    .iter()
662                    .map(|e| e.to_expr(expr_arena))
663                    .collect::<Vec<_>>();
664                let aggs = aggs
665                    .iter()
666                    .map(|e| e.to_expr(expr_arena))
667                    .collect::<Vec<_>>();
668                Ok(Box::new(executors::PartitionGroupByExec::new(
669                    input,
670                    phys_keys,
671                    phys_aggs,
672                    maintain_order,
673                    options.slice,
674                    input_schema,
675                    schema,
676                    from_partitioned_ds,
677                    keys,
678                    aggs,
679                )))
680            } else {
681                let input = recurse!(input, state)?;
682                Ok(Box::new(executors::GroupByExec::new(
683                    input,
684                    phys_keys,
685                    phys_aggs,
686                    apply,
687                    maintain_order,
688                    input_schema,
689                    options.slice,
690                )))
691            }
692        },
693        Join {
694            input_left,
695            input_right,
696            left_on,
697            right_on,
698            options,
699            schema,
700            ..
701        } => {
702            let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
703            let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
704
705            let (input_left, input_right) = state.with_new_branch(|new_state| {
706                (
707                    recurse!(input_left, new_state),
708                    recurse!(input_right, new_state),
709                )
710            });
711            let input_left = input_left?;
712            let input_right = input_right?;
713
714            // Todo! remove the force option. It can deadlock.
715            let parallel = if options.force_parallel {
716                true
717            } else {
718                options.allow_parallel
719            };
720
721            let left_on = create_physical_expressions_from_irs(
722                &left_on,
723                Context::Default,
724                expr_arena,
725                &schema_left,
726                &mut ExpressionConversionState::new(true),
727            )?;
728            let right_on = create_physical_expressions_from_irs(
729                &right_on,
730                Context::Default,
731                expr_arena,
732                &schema_right,
733                &mut ExpressionConversionState::new(true),
734            )?;
735            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
736
737            // Convert the join options, to the physical join options. This requires the physical
738            // planner, so we do this last minute.
739            let join_type_options = options
740                .options
741                .map(|o| {
742                    o.compile(|e| {
743                        let phys_expr = create_physical_expr(
744                            e,
745                            Context::Default,
746                            expr_arena,
747                            &schema,
748                            &mut ExpressionConversionState::new(false),
749                        )?;
750
751                        let execution_state = ExecutionState::default();
752
753                        Ok(Arc::new(move |df: DataFrame| {
754                            let mask = phys_expr.evaluate(&df, &execution_state)?;
755                            let mask = mask.as_materialized_series();
756                            let mask = mask.bool()?;
757                            df._filter_seq(mask)
758                        }))
759                    })
760                })
761                .transpose()?;
762
763            Ok(Box::new(executors::JoinExec::new(
764                input_left,
765                input_right,
766                left_on,
767                right_on,
768                parallel,
769                options.args,
770                join_type_options,
771            )))
772        },
773        HStack {
774            input,
775            exprs,
776            schema: output_schema,
777            options,
778        } => {
779            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
780            let input = recurse!(input, state)?;
781
782            let allow_vertical_parallelism = options.should_broadcast
783                && exprs
784                    .iter()
785                    .all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));
786
787            let mut state =
788                ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
789
790            let phys_exprs = create_physical_expressions_from_irs(
791                &exprs,
792                Context::Default,
793                expr_arena,
794                &input_schema,
795                &mut state,
796            )?;
797            Ok(Box::new(executors::StackExec {
798                input,
799                has_windows: state.has_windows,
800                exprs: phys_exprs,
801                input_schema,
802                output_schema,
803                options,
804                allow_vertical_parallelism,
805            }))
806        },
807        MapFunction {
808            input, function, ..
809        } => {
810            let input = recurse!(input, state)?;
811            Ok(Box::new(executors::UdfExec { input, function }))
812        },
813        ExtContext {
814            input, contexts, ..
815        } => {
816            let input = recurse!(input, state)?;
817            let contexts = contexts
818                .into_iter()
819                .map(|node| recurse!(node, state))
820                .collect::<PolarsResult<_>>()?;
821            Ok(Box::new(executors::ExternalContext { input, contexts }))
822        },
823        SimpleProjection { input, columns } => {
824            let input = recurse!(input, state)?;
825            let exec = executors::ProjectionSimple { input, columns };
826            Ok(Box::new(exec))
827        },
828        #[cfg(feature = "merge_sorted")]
829        MergeSorted {
830            input_left,
831            input_right,
832            key,
833        } => {
834            let (input_left, input_right) = state.with_new_branch(|new_state| {
835                (
836                    recurse!(input_left, new_state),
837                    recurse!(input_right, new_state),
838                )
839            });
840            let input_left = input_left?;
841            let input_right = input_right?;
842
843            let exec = executors::MergeSorted {
844                input_left,
845                input_right,
846                key,
847            };
848            Ok(Box::new(exec))
849        },
850        Invalid => unreachable!(),
851    }
852}
853
854pub fn create_scan_predicate(
855    predicate: &ExprIR,
856    expr_arena: &mut Arena<AExpr>,
857    schema: &Arc<Schema>,
858    state: &mut ExpressionConversionState,
859    create_skip_batch_predicate: bool,
860    create_column_predicates: bool,
861) -> PolarsResult<ScanPredicate> {
862    let phys_predicate =
863        create_physical_expr(predicate, Context::Default, expr_arena, schema, state)?;
864    let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
865        predicate.node(),
866        expr_arena,
867    )));
868
869    let mut skip_batch_predicate = None;
870
871    if create_skip_batch_predicate {
872        if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
873            let expr = ExprIR::new(node, predicate.output_name_inner().clone());
874
875            if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
876                eprintln!("predicate: {}", predicate.display(expr_arena));
877                eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
878            }
879
880            let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
881
882            skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
883            for (col, dtype) in schema.iter() {
884                if !live_columns.contains(col) {
885                    continue;
886                }
887
888                skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
889                skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
890                skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
891            }
892
893            skip_batch_predicate = Some(create_physical_expr(
894                &expr,
895                Context::Default,
896                expr_arena,
897                &Arc::new(skip_batch_schema),
898                state,
899            )?);
900        }
901    }
902
903    let column_predicates = if create_column_predicates {
904        let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
905        if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
906            eprintln!("column_predicates: {{");
907            eprintln!("  [");
908            for (pred, spec) in column_predicates.predicates.values() {
909                eprintln!(
910                    "    {} ({spec:?}),",
911                    ExprIRDisplay::display_node(*pred, expr_arena)
912                );
913            }
914            eprintln!("  ],");
915            eprintln!(
916                "  is_sumwise_complete: {}",
917                column_predicates.is_sumwise_complete
918            );
919            eprintln!("}}");
920        }
921        PhysicalColumnPredicates {
922            predicates: column_predicates
923                .predicates
924                .into_iter()
925                .map(|(n, (p, s))| {
926                    PolarsResult::Ok((
927                        n,
928                        (
929                            create_physical_expr(
930                                &ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
931                                Context::Default,
932                                expr_arena,
933                                schema,
934                                state,
935                            )?,
936                            s,
937                        ),
938                    ))
939                })
940                .collect::<PolarsResult<PlHashMap<_, _>>>()?,
941            is_sumwise_complete: column_predicates.is_sumwise_complete,
942        }
943    } else {
944        PhysicalColumnPredicates {
945            predicates: PlHashMap::default(),
946            is_sumwise_complete: false,
947        }
948    };
949
950    PolarsResult::Ok(ScanPredicate {
951        predicate: phys_predicate,
952        live_columns,
953        skip_batch_predicate,
954        column_predicates,
955    })
956}