polars_mem_engine/planner/
lp.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::global::_set_n_rows_for_scan;
5use polars_plan::plans::expr_ir::ExprIR;
6use polars_utils::format_pl_smallstr;
7use recursive::recursive;
8
9use self::expr_ir::OutputName;
10use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11#[cfg(feature = "python")]
12use self::python_dsl::PythonScanSource;
13use super::super::executors::{self, Executor};
14use super::*;
15use crate::ScanPredicate;
16use crate::executors::{CachePrefiller, SinkExecutor};
17use crate::predicate::PhysicalColumnPredicates;
18
19pub type StreamingExecutorBuilder =
20    fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
21
22fn partitionable_gb(
23    keys: &[ExprIR],
24    aggs: &[ExprIR],
25    input_schema: &Schema,
26    expr_arena: &Arena<AExpr>,
27    apply: &Option<Arc<dyn DataFrameUdf>>,
28) -> bool {
29    // checks:
30    //      1. complex expressions in the group_by itself are also not partitionable
31    //          in this case anything more than col("foo")
32    //      2. a custom function cannot be partitioned
33    //      3. we don't bother with more than 2 keys, as the cardinality likely explodes
34    //         by the combinations
35    if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
36        // complex expressions in the group_by itself are also not partitionable
37        // in this case anything more than col("foo")
38        for key in keys {
39            if (expr_arena).iter(key.node()).count() > 1
40                || has_aexpr(key.node(), expr_arena, |ae| match ae {
41                    AExpr::Literal(lv) => !lv.is_scalar(),
42                    _ => false,
43                })
44            {
45                return false;
46            }
47        }
48
49        can_pre_agg_exprs(aggs, expr_arena, input_schema)
50    } else {
51        false
52    }
53}
54
55#[derive(Clone)]
56struct ConversionState {
57    has_cache_child: bool,
58    has_cache_parent: bool,
59}
60
61impl ConversionState {
62    fn new() -> PolarsResult<Self> {
63        Ok(ConversionState {
64            has_cache_child: false,
65            has_cache_parent: false,
66        })
67    }
68
69    fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
70        let mut new_state = self.clone();
71        new_state.has_cache_child = false;
72        let out = func(&mut new_state);
73        self.has_cache_child = new_state.has_cache_child;
74        out
75    }
76}
77
78pub fn create_physical_plan(
79    root: Node,
80    lp_arena: &mut Arena<IR>,
81    expr_arena: &mut Arena<AExpr>,
82    build_streaming_executor: Option<StreamingExecutorBuilder>,
83) -> PolarsResult<Box<dyn Executor>> {
84    let mut state = ConversionState::new()?;
85    let mut cache_nodes = Default::default();
86    let plan = create_physical_plan_impl(
87        root,
88        lp_arena,
89        expr_arena,
90        &mut state,
91        &mut cache_nodes,
92        build_streaming_executor,
93    )?;
94
95    if cache_nodes.is_empty() {
96        Ok(plan)
97    } else {
98        Ok(Box::new(CachePrefiller {
99            caches: cache_nodes,
100            phys_plan: plan,
101        }))
102    }
103}
104
105pub struct MultiplePhysicalPlans {
106    pub cache_prefiller: Option<Box<dyn Executor>>,
107    pub physical_plans: Vec<Box<dyn Executor>>,
108}
109pub fn create_multiple_physical_plans(
110    roots: &[Node],
111    lp_arena: &mut Arena<IR>,
112    expr_arena: &mut Arena<AExpr>,
113    build_streaming_executor: Option<StreamingExecutorBuilder>,
114) -> PolarsResult<MultiplePhysicalPlans> {
115    let mut state = ConversionState::new()?;
116    let mut cache_nodes = Default::default();
117    let plans = state.with_new_branch(|new_state| {
118        roots
119            .iter()
120            .map(|&node| {
121                create_physical_plan_impl(
122                    node,
123                    lp_arena,
124                    expr_arena,
125                    new_state,
126                    &mut cache_nodes,
127                    build_streaming_executor,
128                )
129            })
130            .collect::<PolarsResult<Vec<_>>>()
131    })?;
132
133    let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
134        struct Empty;
135        impl Executor for Empty {
136            fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
137                Ok(DataFrame::empty())
138            }
139        }
140        Box::new(CachePrefiller {
141            caches: cache_nodes,
142            phys_plan: Box::new(Empty),
143        }) as _
144    });
145
146    Ok(MultiplePhysicalPlans {
147        cache_prefiller,
148        physical_plans: plans,
149    })
150}
151
152#[cfg(feature = "python")]
153#[allow(clippy::type_complexity)]
154pub fn python_scan_predicate(
155    options: &mut PythonOptions,
156    expr_arena: &Arena<AExpr>,
157    state: &mut ExpressionConversionState,
158) -> PolarsResult<(
159    Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
160    Option<Vec<u8>>,
161)> {
162    let mut predicate_serialized = None;
163    let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
164        // Convert to a pyarrow eval string.
165        if matches!(options.python_source, PythonScanSource::Pyarrow) {
166            if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
167                e.node(),
168                expr_arena,
169                Default::default(),
170            ) {
171                options.predicate = PythonPredicate::PyArrow(eval_str);
172                // We don't have to use a physical expression as pyarrow deals with the filter.
173                None
174            } else {
175                Some(create_physical_expr(
176                    e,
177                    Context::Default,
178                    expr_arena,
179                    &options.schema,
180                    state,
181                )?)
182            }
183        }
184        // Convert to physical expression for the case the reader cannot consume the predicate.
185        else {
186            let dsl_expr = e.to_expr(expr_arena);
187            predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
188
189            Some(create_physical_expr(
190                e,
191                Context::Default,
192                expr_arena,
193                &options.schema,
194                state,
195            )?)
196        }
197    } else {
198        None
199    };
200
201    Ok((predicate, predicate_serialized))
202}
203
204#[recursive]
205fn create_physical_plan_impl(
206    root: Node,
207    lp_arena: &mut Arena<IR>,
208    expr_arena: &mut Arena<AExpr>,
209    state: &mut ConversionState,
210    // Cache nodes in order of discovery
211    cache_nodes: &mut PlIndexMap<usize, Box<executors::CacheExec>>,
212    build_streaming_executor: Option<StreamingExecutorBuilder>,
213) -> PolarsResult<Box<dyn Executor>> {
214    use IR::*;
215
216    macro_rules! recurse {
217        ($node:expr, $state: expr) => {
218            create_physical_plan_impl(
219                $node,
220                lp_arena,
221                expr_arena,
222                $state,
223                cache_nodes,
224                build_streaming_executor,
225            )
226        };
227    }
228
229    let logical_plan = if state.has_cache_parent || matches!(lp_arena.get(root), IR::Scan { .. }) {
230        lp_arena.get(root).clone()
231    } else {
232        lp_arena.take(root)
233    };
234
235    match logical_plan {
236        #[cfg(feature = "python")]
237        PythonScan { mut options } => {
238            let mut expr_conv_state = ExpressionConversionState::new(true);
239            let (predicate, predicate_serialized) =
240                python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
241            Ok(Box::new(executors::PythonScanExec {
242                options,
243                predicate,
244                predicate_serialized,
245            }))
246        },
247        Sink { input, payload } => {
248            let input = recurse!(input, state)?;
249            match payload {
250                SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
251                    input,
252                    name: "mem".to_string(),
253                    f: Box::new(move |df, _state| Ok(Some(df))),
254                })),
255                SinkTypeIR::File(FileSinkType {
256                    file_type,
257                    target,
258                    sink_options,
259                    cloud_options,
260                }) => {
261                    let name: &'static str = match &file_type {
262                        #[cfg(feature = "parquet")]
263                        FileType::Parquet(_) => "parquet",
264                        #[cfg(feature = "ipc")]
265                        FileType::Ipc(_) => "ipc",
266                        #[cfg(feature = "csv")]
267                        FileType::Csv(_) => "csv",
268                        #[cfg(feature = "json")]
269                        FileType::Json(_) => "json",
270                        #[allow(unreachable_patterns)]
271                        _ => panic!("enable filetype feature"),
272                    };
273
274                    Ok(Box::new(SinkExecutor {
275                        input,
276                        name: name.to_string(),
277                        f: Box::new(move |mut df, _state| {
278                            let mut file = target
279                                .open_into_writeable(&sink_options, cloud_options.as_ref())?;
280                            let writer = &mut *file;
281
282                            use std::io::BufWriter;
283                            match &file_type {
284                                #[cfg(feature = "parquet")]
285                                FileType::Parquet(options) => {
286                                    use polars_io::parquet::write::ParquetWriter;
287                                    ParquetWriter::new(BufWriter::new(writer))
288                                        .with_compression(options.compression)
289                                        .with_statistics(options.statistics)
290                                        .with_row_group_size(options.row_group_size)
291                                        .with_data_page_size(options.data_page_size)
292                                        .with_key_value_metadata(options.key_value_metadata.clone())
293                                        .finish(&mut df)?;
294                                },
295                                #[cfg(feature = "ipc")]
296                                FileType::Ipc(options) => {
297                                    use polars_io::SerWriter;
298                                    use polars_io::ipc::IpcWriter;
299                                    IpcWriter::new(BufWriter::new(writer))
300                                        .with_compression(options.compression)
301                                        .with_compat_level(options.compat_level)
302                                        .finish(&mut df)?;
303                                },
304                                #[cfg(feature = "csv")]
305                                FileType::Csv(options) => {
306                                    use polars_io::SerWriter;
307                                    use polars_io::csv::write::CsvWriter;
308                                    CsvWriter::new(BufWriter::new(writer))
309                                        .include_bom(options.include_bom)
310                                        .include_header(options.include_header)
311                                        .with_separator(options.serialize_options.separator)
312                                        .with_line_terminator(
313                                            options.serialize_options.line_terminator.clone(),
314                                        )
315                                        .with_quote_char(options.serialize_options.quote_char)
316                                        .with_batch_size(options.batch_size)
317                                        .with_datetime_format(
318                                            options.serialize_options.datetime_format.clone(),
319                                        )
320                                        .with_date_format(
321                                            options.serialize_options.date_format.clone(),
322                                        )
323                                        .with_time_format(
324                                            options.serialize_options.time_format.clone(),
325                                        )
326                                        .with_float_scientific(
327                                            options.serialize_options.float_scientific,
328                                        )
329                                        .with_float_precision(
330                                            options.serialize_options.float_precision,
331                                        )
332                                        .with_null_value(options.serialize_options.null.clone())
333                                        .with_quote_style(options.serialize_options.quote_style)
334                                        .finish(&mut df)?;
335                                },
336                                #[cfg(feature = "json")]
337                                FileType::Json(_options) => {
338                                    use polars_io::SerWriter;
339                                    use polars_io::json::{JsonFormat, JsonWriter};
340
341                                    JsonWriter::new(BufWriter::new(writer))
342                                        .with_json_format(JsonFormat::JsonLines)
343                                        .finish(&mut df)?;
344                                },
345                                #[allow(unreachable_patterns)]
346                                _ => panic!("enable filetype feature"),
347                            }
348
349                            file.sync_on_close(sink_options.sync_on_close)?;
350                            file.close()?;
351
352                            Ok(None)
353                        }),
354                    }))
355                },
356
357                SinkTypeIR::Partition { .. } => {
358                    polars_bail!(InvalidOperation:
359                        "partition sinks not yet supported in standard engine."
360                    )
361                },
362            }
363        },
364        SinkMultiple { .. } => {
365            unreachable!("should be handled with create_multiple_physical_plans")
366        },
367        Union { inputs, options } => {
368            let inputs = state.with_new_branch(|new_state| {
369                inputs
370                    .into_iter()
371                    .map(|node| recurse!(node, new_state))
372                    .collect::<PolarsResult<Vec<_>>>()
373            });
374            let inputs = inputs?;
375            Ok(Box::new(executors::UnionExec { inputs, options }))
376        },
377        HConcat {
378            inputs, options, ..
379        } => {
380            let inputs = state.with_new_branch(|new_state| {
381                inputs
382                    .into_iter()
383                    .map(|node| recurse!(node, new_state))
384                    .collect::<PolarsResult<Vec<_>>>()
385            });
386
387            let inputs = inputs?;
388
389            Ok(Box::new(executors::HConcatExec { inputs, options }))
390        },
391        Slice { input, offset, len } => {
392            let input = recurse!(input, state)?;
393            Ok(Box::new(executors::SliceExec { input, offset, len }))
394        },
395        Filter { input, predicate } => {
396            let mut streamable =
397                is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
398            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
399            if streamable {
400                // This can cause problems with string caches
401                streamable = !input_schema
402                    .iter_values()
403                    .any(|dt| dt.contains_categoricals())
404                    || {
405                        #[cfg(feature = "dtype-categorical")]
406                        {
407                            polars_core::using_string_cache()
408                        }
409
410                        #[cfg(not(feature = "dtype-categorical"))]
411                        {
412                            false
413                        }
414                    }
415            }
416            let input = recurse!(input, state)?;
417            let mut state = ExpressionConversionState::new(true);
418            let predicate = create_physical_expr(
419                &predicate,
420                Context::Default,
421                expr_arena,
422                &input_schema,
423                &mut state,
424            )?;
425            Ok(Box::new(executors::FilterExec::new(
426                predicate,
427                input,
428                state.has_windows,
429                streamable,
430            )))
431        },
432        #[allow(unused_variables)]
433        Scan {
434            sources,
435            file_info,
436            hive_parts,
437            output_schema,
438            scan_type,
439            predicate,
440            mut unified_scan_args,
441            id: scan_mem_id,
442        } => {
443            unified_scan_args.pre_slice = if let Some(mut slice) = unified_scan_args.pre_slice {
444                *slice.len_mut() = _set_n_rows_for_scan(Some(slice.len())).unwrap();
445                Some(slice)
446            } else {
447                _set_n_rows_for_scan(None)
448                    .map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len })
449            };
450
451            let mut expr_conversion_state = ExpressionConversionState::new(true);
452
453            let mut create_skip_batch_predicate = false;
454            #[cfg(feature = "parquet")]
455            {
456                create_skip_batch_predicate |= matches!(
457                    &*scan_type,
458                    FileScan::Parquet {
459                        options: polars_io::prelude::ParquetOptions {
460                            use_statistics: true,
461                            ..
462                        },
463                        ..
464                    }
465                );
466            }
467
468            let predicate = predicate
469                .map(|predicate| {
470                    create_scan_predicate(
471                        &predicate,
472                        expr_arena,
473                        output_schema.as_ref().unwrap_or(&file_info.schema),
474                        &mut expr_conversion_state,
475                        create_skip_batch_predicate,
476                        false,
477                    )
478                })
479                .transpose()?;
480
481            match *scan_type {
482                FileScan::Anonymous { function, .. } => {
483                    Ok(Box::new(executors::AnonymousScanExec {
484                        function,
485                        predicate,
486                        unified_scan_args,
487                        file_info,
488                        output_schema,
489                        predicate_has_windows: expr_conversion_state.has_windows,
490                    }))
491                },
492                #[allow(unreachable_patterns)]
493                _ => {
494                    // We wrap in a CacheExec so that the new-streaming scan gets called from the
495                    // CachePrefiller. This ensures it is called from outside of rayon to avoid
496                    // deadlocks.
497                    //
498                    // Note that we don't actually want it to be kept in memory after being used,
499                    // so we set the count to have it be dropped after a single use (or however
500                    // many times it is referenced after CSE (subplan)).
501                    state.has_cache_parent = true;
502                    state.has_cache_child = true;
503
504                    let scan_mem_id: usize = scan_mem_id.to_usize();
505
506                    if !cache_nodes.contains_key(&scan_mem_id) {
507                        let build_func = build_streaming_executor
508                            .expect("invalid build. Missing feature new-streaming");
509
510                        let executor = build_func(root, lp_arena, expr_arena)?;
511
512                        cache_nodes.insert(
513                            scan_mem_id,
514                            Box::new(executors::CacheExec {
515                                input: Some(executor),
516                                id: scan_mem_id,
517                                // This is (n_hits - 1), because the drop logic is `fetch_sub(1) == 0`.
518                                count: 0,
519                                is_new_streaming_scan: true,
520                            }),
521                        );
522                    } else {
523                        // Already exists - this scan IR is under a CSE (subplan). We need to
524                        // increment the cache hit count here.
525                        let cache_exec = cache_nodes.get_mut(&scan_mem_id).unwrap();
526                        cache_exec.count = cache_exec.count.saturating_add(1);
527                    }
528
529                    Ok(Box::new(executors::CacheExec {
530                        id: scan_mem_id,
531                        // Rest of the fields don't matter - the actual node was inserted into
532                        // `cache_nodes`.
533                        input: None,
534                        count: Default::default(),
535                        is_new_streaming_scan: true,
536                    }))
537                },
538                #[allow(unreachable_patterns)]
539                _ => unreachable!(),
540            }
541        },
542
543        Select {
544            expr,
545            input,
546            schema: _schema,
547            options,
548            ..
549        } => {
550            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
551            let input = recurse!(input, state)?;
552            let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
553            let phys_expr = create_physical_expressions_from_irs(
554                &expr,
555                Context::Default,
556                expr_arena,
557                &input_schema,
558                &mut state,
559            )?;
560
561            let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
562                // If all columns are literal we would get a 1 row per thread.
563                && !phys_expr.iter().all(|p| {
564                    p.is_literal()
565                });
566
567            Ok(Box::new(executors::ProjectionExec {
568                input,
569                expr: phys_expr,
570                has_windows: state.has_windows,
571                input_schema,
572                #[cfg(test)]
573                schema: _schema,
574                options,
575                allow_vertical_parallelism,
576            }))
577        },
578        DataFrameScan {
579            df, output_schema, ..
580        } => Ok(Box::new(executors::DataFrameExec {
581            df,
582            projection: output_schema.map(|s| s.iter_names_cloned().collect()),
583        })),
584        Sort {
585            input,
586            by_column,
587            slice,
588            sort_options,
589        } => {
590            let input_schema = lp_arena.get(input).schema(lp_arena);
591            let by_column = create_physical_expressions_from_irs(
592                &by_column,
593                Context::Default,
594                expr_arena,
595                input_schema.as_ref(),
596                &mut ExpressionConversionState::new(true),
597            )?;
598            let input = recurse!(input, state)?;
599            Ok(Box::new(executors::SortExec {
600                input,
601                by_column,
602                slice,
603                sort_options,
604            }))
605        },
606        Cache {
607            input,
608            id,
609            cache_hits,
610        } => {
611            state.has_cache_parent = true;
612            state.has_cache_child = true;
613
614            if !cache_nodes.contains_key(&id) {
615                let input = recurse!(input, state)?;
616
617                let cache = Box::new(executors::CacheExec {
618                    id,
619                    input: Some(input),
620                    count: cache_hits,
621                    is_new_streaming_scan: false,
622                });
623
624                cache_nodes.insert(id, cache);
625            }
626
627            Ok(Box::new(executors::CacheExec {
628                id,
629                input: None,
630                count: cache_hits,
631                is_new_streaming_scan: false,
632            }))
633        },
634        Distinct { input, options } => {
635            let input = recurse!(input, state)?;
636            Ok(Box::new(executors::UniqueExec { input, options }))
637        },
638        GroupBy {
639            input,
640            keys,
641            aggs,
642            apply,
643            schema,
644            maintain_order,
645            options,
646        } => {
647            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
648            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
649            let phys_keys = create_physical_expressions_from_irs(
650                &keys,
651                Context::Default,
652                expr_arena,
653                &input_schema,
654                &mut ExpressionConversionState::new(true),
655            )?;
656            let phys_aggs = create_physical_expressions_from_irs(
657                &aggs,
658                Context::Aggregation,
659                expr_arena,
660                &input_schema,
661                &mut ExpressionConversionState::new(true),
662            )?;
663
664            let _slice = options.slice;
665            #[cfg(feature = "dynamic_group_by")]
666            if let Some(options) = options.dynamic {
667                let input = recurse!(input, state)?;
668                return Ok(Box::new(executors::GroupByDynamicExec {
669                    input,
670                    keys: phys_keys,
671                    aggs: phys_aggs,
672                    options,
673                    input_schema,
674                    slice: _slice,
675                    apply,
676                }));
677            }
678
679            #[cfg(feature = "dynamic_group_by")]
680            if let Some(options) = options.rolling {
681                let input = recurse!(input, state)?;
682                return Ok(Box::new(executors::GroupByRollingExec {
683                    input,
684                    keys: phys_keys,
685                    aggs: phys_aggs,
686                    options,
687                    input_schema,
688                    slice: _slice,
689                    apply,
690                }));
691            }
692
693            // We first check if we can partition the group_by on the latest moment.
694            let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
695            if partitionable {
696                let from_partitioned_ds = (&*lp_arena).iter(input).any(|(_, lp)| {
697                    if let Union { options, .. } = lp {
698                        options.from_partitioned_ds
699                    } else {
700                        false
701                    }
702                });
703                let input = recurse!(input, state)?;
704                let keys = keys
705                    .iter()
706                    .map(|e| e.to_expr(expr_arena))
707                    .collect::<Vec<_>>();
708                let aggs = aggs
709                    .iter()
710                    .map(|e| e.to_expr(expr_arena))
711                    .collect::<Vec<_>>();
712                Ok(Box::new(executors::PartitionGroupByExec::new(
713                    input,
714                    phys_keys,
715                    phys_aggs,
716                    maintain_order,
717                    options.slice,
718                    input_schema,
719                    schema,
720                    from_partitioned_ds,
721                    keys,
722                    aggs,
723                )))
724            } else {
725                let input = recurse!(input, state)?;
726                Ok(Box::new(executors::GroupByExec::new(
727                    input,
728                    phys_keys,
729                    phys_aggs,
730                    apply,
731                    maintain_order,
732                    input_schema,
733                    options.slice,
734                )))
735            }
736        },
737        Join {
738            input_left,
739            input_right,
740            left_on,
741            right_on,
742            options,
743            schema,
744            ..
745        } => {
746            let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
747            let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
748
749            let (input_left, input_right) = state.with_new_branch(|new_state| {
750                (
751                    recurse!(input_left, new_state),
752                    recurse!(input_right, new_state),
753                )
754            });
755            let input_left = input_left?;
756            let input_right = input_right?;
757
758            // Todo! remove the force option. It can deadlock.
759            let parallel = if options.force_parallel {
760                true
761            } else {
762                options.allow_parallel
763            };
764
765            let left_on = create_physical_expressions_from_irs(
766                &left_on,
767                Context::Default,
768                expr_arena,
769                &schema_left,
770                &mut ExpressionConversionState::new(true),
771            )?;
772            let right_on = create_physical_expressions_from_irs(
773                &right_on,
774                Context::Default,
775                expr_arena,
776                &schema_right,
777                &mut ExpressionConversionState::new(true),
778            )?;
779            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
780
781            // Convert the join options, to the physical join options. This requires the physical
782            // planner, so we do this last minute.
783            let join_type_options = options
784                .options
785                .map(|o| {
786                    o.compile(|e| {
787                        let phys_expr = create_physical_expr(
788                            e,
789                            Context::Default,
790                            expr_arena,
791                            &schema,
792                            &mut ExpressionConversionState::new(false),
793                        )?;
794
795                        let execution_state = ExecutionState::default();
796
797                        Ok(Arc::new(move |df: DataFrame| {
798                            let mask = phys_expr.evaluate(&df, &execution_state)?;
799                            let mask = mask.as_materialized_series();
800                            let mask = mask.bool()?;
801                            df._filter_seq(mask)
802                        }))
803                    })
804                })
805                .transpose()?;
806
807            Ok(Box::new(executors::JoinExec::new(
808                input_left,
809                input_right,
810                left_on,
811                right_on,
812                parallel,
813                options.args,
814                join_type_options,
815            )))
816        },
817        HStack {
818            input,
819            exprs,
820            schema: output_schema,
821            options,
822        } => {
823            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
824            let input = recurse!(input, state)?;
825
826            let allow_vertical_parallelism = options.should_broadcast
827                && exprs
828                    .iter()
829                    .all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));
830
831            let mut state =
832                ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
833
834            let phys_exprs = create_physical_expressions_from_irs(
835                &exprs,
836                Context::Default,
837                expr_arena,
838                &input_schema,
839                &mut state,
840            )?;
841            Ok(Box::new(executors::StackExec {
842                input,
843                has_windows: state.has_windows,
844                exprs: phys_exprs,
845                input_schema,
846                output_schema,
847                options,
848                allow_vertical_parallelism,
849            }))
850        },
851        MapFunction {
852            input, function, ..
853        } => {
854            let input = recurse!(input, state)?;
855            Ok(Box::new(executors::UdfExec { input, function }))
856        },
857        ExtContext {
858            input, contexts, ..
859        } => {
860            let input = recurse!(input, state)?;
861            let contexts = contexts
862                .into_iter()
863                .map(|node| recurse!(node, state))
864                .collect::<PolarsResult<_>>()?;
865            Ok(Box::new(executors::ExternalContext { input, contexts }))
866        },
867        SimpleProjection { input, columns } => {
868            let input = recurse!(input, state)?;
869            let exec = executors::ProjectionSimple { input, columns };
870            Ok(Box::new(exec))
871        },
872        #[cfg(feature = "merge_sorted")]
873        MergeSorted {
874            input_left,
875            input_right,
876            key,
877        } => {
878            let (input_left, input_right) = state.with_new_branch(|new_state| {
879                (
880                    recurse!(input_left, new_state),
881                    recurse!(input_right, new_state),
882                )
883            });
884            let input_left = input_left?;
885            let input_right = input_right?;
886
887            let exec = executors::MergeSorted {
888                input_left,
889                input_right,
890                key,
891            };
892            Ok(Box::new(exec))
893        },
894        Invalid => unreachable!(),
895    }
896}
897
898pub fn create_scan_predicate(
899    predicate: &ExprIR,
900    expr_arena: &mut Arena<AExpr>,
901    schema: &Arc<Schema>,
902    state: &mut ExpressionConversionState,
903    create_skip_batch_predicate: bool,
904    create_column_predicates: bool,
905) -> PolarsResult<ScanPredicate> {
906    let phys_predicate =
907        create_physical_expr(predicate, Context::Default, expr_arena, schema, state)?;
908    let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
909        predicate.node(),
910        expr_arena,
911    )));
912
913    let mut skip_batch_predicate = None;
914
915    if create_skip_batch_predicate {
916        if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
917            let expr = ExprIR::new(node, predicate.output_name_inner().clone());
918
919            if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
920                eprintln!("predicate: {}", predicate.display(expr_arena));
921                eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
922            }
923
924            let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
925
926            skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
927            for (col, dtype) in schema.iter() {
928                if !live_columns.contains(col) {
929                    continue;
930                }
931
932                skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
933                skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
934                skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
935            }
936
937            skip_batch_predicate = Some(create_physical_expr(
938                &expr,
939                Context::Default,
940                expr_arena,
941                &Arc::new(skip_batch_schema),
942                state,
943            )?);
944        }
945    }
946
947    let column_predicates = if create_column_predicates {
948        let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
949        if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
950            eprintln!("column_predicates: {{");
951            eprintln!("  [");
952            for (pred, spec) in column_predicates.predicates.values() {
953                eprintln!(
954                    "    {} ({spec:?}),",
955                    ExprIRDisplay::display_node(*pred, expr_arena)
956                );
957            }
958            eprintln!("  ],");
959            eprintln!(
960                "  is_sumwise_complete: {}",
961                column_predicates.is_sumwise_complete
962            );
963            eprintln!("}}");
964        }
965        PhysicalColumnPredicates {
966            predicates: column_predicates
967                .predicates
968                .into_iter()
969                .map(|(n, (p, s))| {
970                    PolarsResult::Ok((
971                        n,
972                        (
973                            create_physical_expr(
974                                &ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
975                                Context::Default,
976                                expr_arena,
977                                schema,
978                                state,
979                            )?,
980                            s,
981                        ),
982                    ))
983                })
984                .collect::<PolarsResult<PlHashMap<_, _>>>()?,
985            is_sumwise_complete: column_predicates.is_sumwise_complete,
986        }
987    } else {
988        PhysicalColumnPredicates {
989            predicates: PlHashMap::default(),
990            is_sumwise_complete: false,
991        }
992    };
993
994    PolarsResult::Ok(ScanPredicate {
995        predicate: phys_predicate,
996        live_columns,
997        skip_batch_predicate,
998        column_predicates,
999    })
1000}