polars_plan/plans/conversion/dsl_to_ir/
mod.rs

1use arrow::datatypes::ArrowSchemaRef;
2use either::Either;
3use expr_expansion::rewrite_projections;
4use hive::hive_partitions_from_paths;
5use polars_core::chunked_array::cast::CastOptions;
6use polars_core::config::verbose;
7use polars_utils::plpath::PlPath;
8use polars_utils::unique_id::UniqueId;
9
10use super::convert_utils::SplitPredicates;
11use super::stack_opt::ConversionOptimizer;
12use super::*;
13
14mod concat;
15mod datatype_fn_to_ir;
16mod expr_expansion;
17mod expr_to_ir;
18mod functions;
19mod join;
20mod scans;
21mod utils;
22pub use expr_expansion::{expand_expression, is_regex_projection, prepare_projection};
23pub use expr_to_ir::{ExprToIRContext, to_expr_ir};
24use expr_to_ir::{to_expr_ir_materialized_lit, to_expr_irs};
25use utils::DslConversionContext;
26
27macro_rules! failed_here {
28    ($($t:tt)*) => {
29        format!("'{}'", stringify!($($t)*)).into()
30    }
31}
32pub(super) use failed_here;
33
34pub fn to_alp(
35    lp: DslPlan,
36    expr_arena: &mut Arena<AExpr>,
37    lp_arena: &mut Arena<IR>,
38    // Only `SIMPLIFY_EXPR`, `TYPE_COERCION`, `TYPE_CHECK` are respected.
39    opt_flags: &mut OptFlags,
40) -> PolarsResult<Node> {
41    let conversion_optimizer = ConversionOptimizer::new(
42        opt_flags.contains(OptFlags::SIMPLIFY_EXPR),
43        opt_flags.contains(OptFlags::TYPE_COERCION),
44        opt_flags.contains(OptFlags::TYPE_CHECK),
45    );
46
47    let mut ctxt = DslConversionContext {
48        expr_arena,
49        lp_arena,
50        conversion_optimizer,
51        opt_flags,
52        nodes_scratch: &mut unitvec![],
53        cache_file_info: Default::default(),
54        pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),
55        verbose: verbose(),
56        seen_caches: Default::default(),
57    };
58
59    match to_alp_impl(lp, &mut ctxt) {
60        Ok(out) => Ok(out),
61        Err(err) => {
62            if opt_flags.contains(OptFlags::EAGER) {
63                // If we dispatched to the lazy engine from the eager API, we don't want to resolve
64                // where in the query plan it went wrong. It is clear from the backtrace anyway.
65                return Err(err.remove_context());
66            };
67
68            let Some(ir_until_then) = lp_arena.last_node() else {
69                return Err(err);
70            };
71
72            let node_name = if let PolarsError::Context { msg, .. } = &err {
73                msg
74            } else {
75                "THIS_NODE"
76            };
77            let plan = IRPlan::new(
78                ir_until_then,
79                std::mem::take(lp_arena),
80                std::mem::take(expr_arena),
81            );
82            let location = format!("{}", plan.display());
83            Err(err.wrap_msg(|msg| {
84                format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")
85            }))
86        },
87    }
88}
89
90fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {
91    let lp_node = ctxt.lp_arena.add(lp);
92    ctxt.conversion_optimizer
93        .optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node, false)
94        .map_err(|e| e.context(format!("'{name}' failed").into()))?;
95
96    Ok(lp_node)
97}
98
99/// converts LogicalPlan to IR
100/// it adds expressions & lps to the respective arenas as it traverses the plan
101/// finally it returns the top node of the logical plan
102#[recursive]
103pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
104    let owned = Arc::unwrap_or_clone;
105
106    let v = match lp {
107        DslPlan::Scan {
108            sources,
109            unified_scan_args,
110            scan_type,
111            cached_ir,
112        } => scans::dsl_to_ir(sources, unified_scan_args, scan_type, cached_ir, ctxt)?,
113        #[cfg(feature = "python")]
114        DslPlan::PythonScan { options } => {
115            use crate::dsl::python_dsl::PythonOptionsDsl;
116
117            let schema = options.get_schema()?;
118
119            let PythonOptionsDsl {
120                scan_fn,
121                schema_fn: _,
122                python_source,
123                validate_schema,
124                is_pure,
125            } = options;
126
127            IR::PythonScan {
128                options: PythonOptions {
129                    scan_fn,
130                    schema,
131                    python_source,
132                    validate_schema,
133                    output_schema: Default::default(),
134                    with_columns: Default::default(),
135                    n_rows: Default::default(),
136                    predicate: Default::default(),
137                    is_pure,
138                },
139            }
140        },
141        DslPlan::Union { inputs, args } => {
142            let mut inputs = inputs
143                .into_iter()
144                .map(|lp| to_alp_impl(lp, ctxt))
145                .collect::<PolarsResult<Vec<_>>>()
146                .map_err(|e| e.context(failed_here!(vertical concat)))?;
147
148            if args.diagonal {
149                inputs = concat::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;
150            }
151
152            if args.to_supertypes {
153                concat::convert_st_union(
154                    &mut inputs,
155                    ctxt.lp_arena,
156                    ctxt.expr_arena,
157                    ctxt.opt_flags,
158                )
159                .map_err(|e| e.context(failed_here!(vertical concat)))?;
160            }
161
162            let first = *inputs.first().ok_or_else(
163                || polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),
164            )?;
165            let schema = ctxt.lp_arena.get(first).schema(ctxt.lp_arena);
166            for n in &inputs[1..] {
167                let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);
168                // The first argument
169                schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation:  "'union'/'concat' inputs should all have the same schema,\
170                    got\n{:?} and \n{:?}", schema, schema_i)
171                )?;
172            }
173
174            let options = args.into();
175            IR::Union { inputs, options }
176        },
177        DslPlan::HConcat { inputs, options } => {
178            let inputs = inputs
179                .into_iter()
180                .map(|lp| to_alp_impl(lp, ctxt))
181                .collect::<PolarsResult<Vec<_>>>()
182                .map_err(|e| e.context(failed_here!(horizontal concat)))?;
183
184            let schema = concat::h_concat_schema(&inputs, ctxt.lp_arena)?;
185
186            IR::HConcat {
187                inputs,
188                schema,
189                options,
190            }
191        },
192        DslPlan::Filter { input, predicate } => {
193            let mut input =
194                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;
195            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
196
197            let mut out = Vec::with_capacity(1);
198            expr_expansion::expand_expression(
199                &predicate,
200                &PlHashSet::default(),
201                input_schema.as_ref().as_ref(),
202                &mut out,
203                ctxt.opt_flags,
204            )?;
205
206            let predicate = match out.len() {
207                1 => {
208                    // all good
209                    out.pop().unwrap()
210                },
211                0 => {
212                    let msg = "The predicate expanded to zero expressions. \
213                            This may for example be caused by a regex not matching column names or \
214                            a column dtype match not hitting any dtypes in the DataFrame";
215                    polars_bail!(ComputeError: msg);
216                },
217                _ => {
218                    let mut expanded = String::new();
219                    for e in out.iter().take(5) {
220                        expanded.push_str(&format!("\t{e:?},\n"))
221                    }
222                    // pop latest comma
223                    expanded.pop();
224                    if out.len() > 5 {
225                        expanded.push_str("\t...\n")
226                    }
227
228                    let msg = if cfg!(feature = "python") {
229                        format!(
230                            "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
231                                This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."
232                        )
233                    } else {
234                        format!(
235                            "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
236                                This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."
237                        )
238                    };
239                    polars_bail!(ComputeError: msg)
240                },
241            };
242            let predicate_ae = to_expr_ir(
243                predicate,
244                &mut ExprToIRContext::new_with_opt_eager(
245                    ctxt.expr_arena,
246                    &input_schema,
247                    ctxt.opt_flags,
248                ),
249            )?;
250
251            if ctxt.opt_flags.predicate_pushdown() {
252                ctxt.nodes_scratch.clear();
253
254                if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(
255                    predicate_ae.node(),
256                    ctxt.expr_arena,
257                    Some(ctxt.nodes_scratch),
258                    ctxt.pushdown_maintain_errors,
259                ) {
260                    let mut update_input = |predicate: Node| -> PolarsResult<()> {
261                        let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
262                        ctxt.conversion_optimizer
263                            .push_scratch(predicate.node(), ctxt.expr_arena);
264                        let lp = IR::Filter { input, predicate };
265                        input = run_conversion(lp, ctxt, "filter")?;
266
267                        Ok(())
268                    };
269
270                    // Pushables first, then fallible.
271
272                    for predicate in pushable {
273                        update_input(predicate)?;
274                    }
275
276                    if let Some(node) = fallible {
277                        update_input(node)?;
278                    }
279
280                    return Ok(input);
281                };
282            };
283
284            ctxt.conversion_optimizer
285                .push_scratch(predicate_ae.node(), ctxt.expr_arena);
286            let lp = IR::Filter {
287                input,
288                predicate: predicate_ae,
289            };
290            return run_conversion(lp, ctxt, "filter");
291        },
292        DslPlan::Slice { input, offset, len } => {
293            let input =
294                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;
295            IR::Slice { input, offset, len }
296        },
297        DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
298            df,
299            schema,
300            output_schema: None,
301        },
302        DslPlan::Select {
303            expr,
304            input,
305            options,
306        } => {
307            let input =
308                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
309            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
310            let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)
311                .map_err(|e| e.context(failed_here!(select)))?;
312
313            if exprs.is_empty() {
314                ctxt.lp_arena.replace(input, utils::empty_df());
315                return Ok(input);
316            }
317
318            let eirs = to_expr_irs(
319                exprs,
320                &mut ExprToIRContext::new_with_opt_eager(
321                    ctxt.expr_arena,
322                    &input_schema,
323                    ctxt.opt_flags,
324                ),
325            )?;
326            ctxt.conversion_optimizer
327                .fill_scratch(&eirs, ctxt.expr_arena);
328
329            let schema = Arc::new(schema);
330            let lp = IR::Select {
331                expr: eirs,
332                input,
333                schema,
334                options,
335            };
336
337            return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));
338        },
339        DslPlan::Sort {
340            input,
341            by_column,
342            slice,
343            mut sort_options,
344        } => {
345            let input =
346                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
347            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
348
349            // note: if given an Expr::Columns, count the individual cols
350            let n_by_exprs = if by_column.len() == 1 {
351                match &by_column[0] {
352                    Expr::Selector(s) => s.into_columns(&input_schema, &Default::default())?.len(),
353                    _ => 1,
354                }
355            } else {
356                by_column.len()
357            };
358            let n_desc = sort_options.descending.len();
359            polars_ensure!(
360                n_desc == n_by_exprs || n_desc == 1,
361                ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()
362            );
363            let n_nulls_last = sort_options.nulls_last.len();
364            polars_ensure!(
365                n_nulls_last == n_by_exprs || n_nulls_last == 1,
366                ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()
367            );
368
369            let mut expanded_cols = Vec::new();
370            let mut nulls_last = Vec::new();
371            let mut descending = Vec::new();
372
373            // note: nulls_last/descending need to be matched to expanded multi-output expressions.
374            // when one of nulls_last/descending has not been updated from the default (single
375            // value true/false), 'cycle' ensures that "by_column" iter is not truncated.
376            for (c, (&n, &d)) in by_column.into_iter().zip(
377                sort_options
378                    .nulls_last
379                    .iter()
380                    .cycle()
381                    .zip(sort_options.descending.iter().cycle()),
382            ) {
383                let exprs = utils::expand_expressions(
384                    input,
385                    vec![c],
386                    ctxt.lp_arena,
387                    ctxt.expr_arena,
388                    ctxt.opt_flags,
389                )
390                .map_err(|e| e.context(failed_here!(sort)))?;
391
392                nulls_last.extend(std::iter::repeat_n(n, exprs.len()));
393                descending.extend(std::iter::repeat_n(d, exprs.len()));
394                expanded_cols.extend(exprs);
395            }
396            sort_options.nulls_last = nulls_last;
397            sort_options.descending = descending;
398
399            ctxt.conversion_optimizer
400                .fill_scratch(&expanded_cols, ctxt.expr_arena);
401            let mut by_column = expanded_cols;
402
403            // Remove null columns in multi-columns sort
404            if by_column.len() > 1 {
405                let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
406
407                let mut null_columns = vec![];
408
409                for (i, c) in by_column.iter().enumerate() {
410                    if let DataType::Null = c.dtype(&input_schema, ctxt.expr_arena)? {
411                        null_columns.push(i);
412                    }
413                }
414                // All null columns, only take one.
415                if null_columns.len() == by_column.len() {
416                    by_column.truncate(1);
417                    sort_options.nulls_last.truncate(1);
418                    sort_options.descending.truncate(1);
419                }
420                // Remove the null columns
421                else if !null_columns.is_empty() {
422                    for i in null_columns.into_iter().rev() {
423                        by_column.remove(i);
424                        sort_options.nulls_last.remove(i);
425                        sort_options.descending.remove(i);
426                    }
427                }
428            }
429            if by_column.is_empty() {
430                return Ok(input);
431            };
432
433            let lp = IR::Sort {
434                input,
435                by_column,
436                slice,
437                sort_options,
438            };
439
440            return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));
441        },
442        DslPlan::Cache { input, id } => {
443            let input = match ctxt.seen_caches.get(&id) {
444                Some(input) => *input,
445                None => {
446                    let input = to_alp_impl(owned(input), ctxt)
447                        .map_err(|e| e.context(failed_here!(cache)))?;
448                    let seen_before = ctxt.seen_caches.insert(id, input);
449                    assert!(
450                        seen_before.is_none(),
451                        "Cache could not have been created in the mean time. That would make the DAG cyclic."
452                    );
453                    input
454                },
455            };
456
457            IR::Cache { input, id }
458        },
459        DslPlan::GroupBy {
460            input,
461            keys,
462            aggs,
463            apply,
464            maintain_order,
465            options,
466        } => {
467            let input =
468                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;
469
470            // Rolling + group-by sorts the whole table, so remove unneeded columns
471            if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {
472                ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)
473            }
474
475            let (keys, aggs, schema) = resolve_group_by(
476                input,
477                keys,
478                aggs,
479                &options,
480                ctxt.lp_arena,
481                ctxt.expr_arena,
482                ctxt.opt_flags,
483            )
484            .map_err(|e| e.context(failed_here!(group_by)))?;
485
486            let (apply, schema) = if let Some((apply, schema)) = apply {
487                (Some(apply), schema)
488            } else {
489                (None, schema)
490            };
491
492            ctxt.conversion_optimizer
493                .fill_scratch(&keys, ctxt.expr_arena);
494            ctxt.conversion_optimizer
495                .fill_scratch(&aggs, ctxt.expr_arena);
496
497            let lp = IR::GroupBy {
498                input,
499                keys,
500                aggs,
501                schema,
502                apply,
503                maintain_order,
504                options,
505            };
506
507            return run_conversion(lp, ctxt, "group_by")
508                .map_err(|e| e.context(failed_here!(group_by)));
509        },
510        DslPlan::Join {
511            input_left,
512            input_right,
513            left_on,
514            right_on,
515            predicates,
516            options,
517        } => {
518            return join::resolve_join(
519                Either::Left(input_left),
520                Either::Left(input_right),
521                left_on,
522                right_on,
523                predicates,
524                JoinOptionsIR::from(Arc::unwrap_or_clone(options)),
525                ctxt,
526            )
527            .map_err(|e| e.context(failed_here!(join)))
528            .map(|t| t.0);
529        },
530        DslPlan::HStack {
531            input,
532            exprs,
533            options,
534        } => {
535            let input = to_alp_impl(owned(input), ctxt)
536                .map_err(|e| e.context(failed_here!(with_columns)))?;
537            let (exprs, schema) =
538                resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)
539                    .map_err(|e| e.context(failed_here!(with_columns)))?;
540
541            ctxt.conversion_optimizer
542                .fill_scratch(&exprs, ctxt.expr_arena);
543            let lp = IR::HStack {
544                input,
545                exprs,
546                schema,
547                options,
548            };
549            return run_conversion(lp, ctxt, "with_columns");
550        },
551        DslPlan::MatchToSchema {
552            input,
553            match_schema,
554            per_column,
555            extra_columns,
556        } => {
557            let input =
558                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
559            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
560
561            assert_eq!(per_column.len(), match_schema.len());
562
563            if input_schema.as_ref() == &match_schema {
564                return Ok(input);
565            }
566
567            let mut exprs = Vec::with_capacity(match_schema.len());
568            let mut found_missing_columns = Vec::new();
569            let mut used_input_columns = 0;
570
571            for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {
572                match input_schema.get(column) {
573                    None => match &per_column.missing_columns {
574                        MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),
575                        MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(
576                            Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(
577                                dtype.clone(),
578                            )))),
579                            column.clone(),
580                        )),
581                        MissingColumnsPolicyOrExpr::InsertWith(expr) => {
582                            exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))
583                        },
584                    },
585                    Some(input_dtype) if dtype == input_dtype => {
586                        used_input_columns += 1;
587                        exprs.push(Expr::Column(column.clone()))
588                    },
589                    Some(input_dtype) => {
590                        let from_dtype = input_dtype;
591                        let to_dtype = dtype;
592
593                        let policy = CastColumnsPolicy {
594                            integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,
595                            float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,
596                            missing_struct_fields: per_column.missing_struct_fields,
597                            extra_struct_fields: per_column.extra_struct_fields,
598
599                            ..Default::default()
600                        };
601
602                        let should_cast =
603                            policy.should_cast_column(column, to_dtype, from_dtype)?;
604
605                        let mut expr = Expr::Column(PlSmallStr::from_str(column));
606                        if should_cast {
607                            expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);
608                        }
609
610                        used_input_columns += 1;
611                        exprs.push(expr);
612                    },
613                }
614            }
615
616            // Report the error for missing columns
617            if let Some(lst) = found_missing_columns.first() {
618                use std::fmt::Write;
619                let mut formatted = String::new();
620                write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();
621                for c in &found_missing_columns[1..] {
622                    write!(&mut formatted, ", \"{c}\"").unwrap();
623                }
624
625                write!(&mut formatted, "\"{lst}\"").unwrap();
626                polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");
627            }
628
629            // Report the error for extra columns
630            if used_input_columns != input_schema.len()
631                && extra_columns == ExtraColumnsPolicy::Raise
632            {
633                let found_extra_columns = input_schema
634                    .iter_names()
635                    .filter(|n| !match_schema.contains(n))
636                    .collect::<Vec<_>>();
637
638                use std::fmt::Write;
639                let mut formatted = String::new();
640                write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();
641                for c in &found_extra_columns[1..] {
642                    write!(&mut formatted, ", \"{c}\"").unwrap();
643                }
644
645                polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");
646            }
647
648            let exprs = to_expr_irs(
649                exprs,
650                &mut ExprToIRContext::new_with_opt_eager(
651                    ctxt.expr_arena,
652                    &input_schema,
653                    ctxt.opt_flags,
654                ),
655            )?;
656
657            ctxt.conversion_optimizer
658                .fill_scratch(&exprs, ctxt.expr_arena);
659            let lp = IR::Select {
660                input,
661                expr: exprs,
662                schema: match_schema.clone(),
663                options: ProjectionOptions {
664                    run_parallel: true,
665                    duplicate_check: false,
666                    should_broadcast: true,
667                },
668            };
669            return run_conversion(lp, ctxt, "match_to_schema");
670        },
671        DslPlan::PipeWithSchema { input, callback } => {
672            let input_owned = owned(input);
673
674            // Derive the schema from the input
675            let input = to_alp_impl(input_owned.clone(), ctxt)
676                .map_err(|e| e.context(failed_here!(pipe_with_schema)))?;
677            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
678
679            let input_owned = DslPlan::IR {
680                dsl: Arc::new(input_owned),
681                version: ctxt.lp_arena.version(),
682                node: Some(input),
683            };
684
685            // Adjust the input and start conversion again
686            let input_adjusted =
687                callback.call((input_owned, Arc::unwrap_or_clone(input_schema.into_owned())))?;
688            return to_alp_impl(input_adjusted, ctxt);
689        },
690        DslPlan::Distinct { input, options } => {
691            let input =
692                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
693            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
694
695            let subset = options
696                .subset
697                .map(|s| {
698                    PolarsResult::Ok(
699                        s.into_columns(input_schema.as_ref(), &Default::default())?
700                            .into_iter()
701                            .collect(),
702                    )
703                })
704                .transpose()?;
705
706            let options = DistinctOptionsIR {
707                subset,
708                maintain_order: options.maintain_order,
709                keep_strategy: options.keep_strategy,
710                slice: None,
711            };
712
713            IR::Distinct { input, options }
714        },
715        DslPlan::MapFunction { input, function } => {
716            let input = to_alp_impl(owned(input), ctxt)
717                .map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;
718            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
719
720            match function {
721                DslFunction::Explode {
722                    columns,
723                    allow_empty,
724                } => {
725                    let columns = columns.into_columns(&input_schema, &Default::default())?;
726                    polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");
727                    if columns.is_empty() {
728                        return Ok(input);
729                    }
730                    let function = FunctionIR::Explode {
731                        columns: columns.into_iter().collect(),
732                        schema: Default::default(),
733                    };
734                    let ir = IR::MapFunction { input, function };
735                    return Ok(ctxt.lp_arena.add(ir));
736                },
737                DslFunction::FillNan(fill_value) => {
738                    let exprs = input_schema
739                        .iter()
740                        .filter_map(|(name, dtype)| match dtype {
741                            DataType::Float32 | DataType::Float64 => Some(
742                                col(name.clone())
743                                    .fill_nan(fill_value.clone())
744                                    .alias(name.clone()),
745                            ),
746                            _ => None,
747                        })
748                        .collect::<Vec<_>>();
749
750                    let (exprs, schema) = resolve_with_columns(
751                        exprs,
752                        input,
753                        ctxt.lp_arena,
754                        ctxt.expr_arena,
755                        ctxt.opt_flags,
756                    )
757                    .map_err(|e| e.context(failed_here!(fill_nan)))?;
758
759                    ctxt.conversion_optimizer
760                        .fill_scratch(&exprs, ctxt.expr_arena);
761
762                    let lp = IR::HStack {
763                        input,
764                        exprs,
765                        schema,
766                        options: ProjectionOptions {
767                            duplicate_check: false,
768                            ..Default::default()
769                        },
770                    };
771                    return run_conversion(lp, ctxt, "fill_nan");
772                },
773                DslFunction::Stats(sf) => {
774                    let exprs = match sf {
775                        StatsFunction::Var { ddof } => stats_helper(
776                            |dt| dt.is_primitive_numeric() || dt.is_bool(),
777                            |name| col(name.clone()).var(ddof),
778                            &input_schema,
779                        ),
780                        StatsFunction::Std { ddof } => stats_helper(
781                            |dt| dt.is_primitive_numeric() || dt.is_bool(),
782                            |name| col(name.clone()).std(ddof),
783                            &input_schema,
784                        ),
785                        StatsFunction::Quantile { quantile, method } => stats_helper(
786                            |dt| dt.is_primitive_numeric(),
787                            |name| col(name.clone()).quantile(quantile.clone(), method),
788                            &input_schema,
789                        ),
790                        StatsFunction::Mean => stats_helper(
791                            |dt| {
792                                dt.is_primitive_numeric()
793                                    || dt.is_temporal()
794                                    || dt == &DataType::Boolean
795                            },
796                            |name| col(name.clone()).mean(),
797                            &input_schema,
798                        ),
799                        StatsFunction::Sum => stats_helper(
800                            |dt| {
801                                dt.is_primitive_numeric()
802                                    || dt.is_decimal()
803                                    || matches!(dt, DataType::Boolean | DataType::Duration(_))
804                            },
805                            |name| col(name.clone()).sum(),
806                            &input_schema,
807                        ),
808                        StatsFunction::Min => stats_helper(
809                            |dt| dt.is_ord(),
810                            |name| col(name.clone()).min(),
811                            &input_schema,
812                        ),
813                        StatsFunction::Max => stats_helper(
814                            |dt| dt.is_ord(),
815                            |name| col(name.clone()).max(),
816                            &input_schema,
817                        ),
818                        StatsFunction::Median => stats_helper(
819                            |dt| {
820                                dt.is_primitive_numeric()
821                                    || dt.is_temporal()
822                                    || dt == &DataType::Boolean
823                            },
824                            |name| col(name.clone()).median(),
825                            &input_schema,
826                        ),
827                    };
828                    let schema = Arc::new(expressions_to_schema(&exprs, &input_schema)?);
829                    let eirs = to_expr_irs(
830                        exprs,
831                        &mut ExprToIRContext::new_with_opt_eager(
832                            ctxt.expr_arena,
833                            &input_schema,
834                            ctxt.opt_flags,
835                        ),
836                    )?;
837
838                    ctxt.conversion_optimizer
839                        .fill_scratch(&eirs, ctxt.expr_arena);
840
841                    let lp = IR::Select {
842                        input,
843                        expr: eirs,
844                        schema,
845                        options: ProjectionOptions {
846                            duplicate_check: false,
847                            ..Default::default()
848                        },
849                    };
850                    return run_conversion(lp, ctxt, "stats");
851                },
852                DslFunction::Rename {
853                    existing,
854                    new,
855                    strict,
856                } => {
857                    assert_eq!(existing.len(), new.len());
858                    if existing.is_empty() {
859                        return Ok(input);
860                    }
861
862                    let existing_lut =
863                        PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));
864
865                    let mut schema = Schema::with_capacity(input_schema.len());
866                    let mut num_replaced = 0;
867
868                    // Turn the rename into a select.
869                    let expr = input_schema
870                        .iter()
871                        .map(|(n, dtype)| {
872                            Ok(match existing_lut.get_index_of(n.as_str()) {
873                                None => {
874                                    schema.try_insert(n.clone(), dtype.clone())?;
875                                    Expr::Column(n.clone())
876                                },
877                                Some(i) => {
878                                    num_replaced += 1;
879                                    schema.try_insert(new[i].clone(), dtype.clone())?;
880                                    Expr::Column(n.clone()).alias(new[i].clone())
881                                },
882                            })
883                        })
884                        .collect::<PolarsResult<Vec<_>>>()?;
885
886                    if strict && num_replaced != existing.len() {
887                        let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();
888                        polars_bail!(col_not_found = col);
889                    }
890
891                    // Nothing changed, make into a no-op.
892                    if num_replaced == 0 {
893                        return Ok(input);
894                    }
895
896                    let expr = to_expr_irs(
897                        expr,
898                        &mut ExprToIRContext::new_with_opt_eager(
899                            ctxt.expr_arena,
900                            &input_schema,
901                            ctxt.opt_flags,
902                        ),
903                    )?;
904                    ctxt.conversion_optimizer
905                        .fill_scratch(&expr, ctxt.expr_arena);
906
907                    IR::Select {
908                        input,
909                        expr,
910                        schema: Arc::new(schema),
911                        options: ProjectionOptions {
912                            run_parallel: false,
913                            duplicate_check: false,
914                            should_broadcast: false,
915                        },
916                    }
917                },
918                _ => {
919                    let function = function.into_function_ir(&input_schema)?;
920                    IR::MapFunction { input, function }
921                },
922            }
923        },
924        DslPlan::ExtContext { input, contexts } => {
925            let input = to_alp_impl(owned(input), ctxt)
926                .map_err(|e| e.context(failed_here!(with_context)))?;
927            let contexts = contexts
928                .into_iter()
929                .map(|lp| to_alp_impl(lp, ctxt))
930                .collect::<PolarsResult<Vec<_>>>()
931                .map_err(|e| e.context(failed_here!(with_context)))?;
932
933            let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();
934            for input in &contexts {
935                let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);
936                for fld in other_schema.iter_fields() {
937                    if schema.get(fld.name()).is_none() {
938                        schema.with_column(fld.name, fld.dtype);
939                    }
940                }
941            }
942
943            IR::ExtContext {
944                input,
945                contexts,
946                schema: Arc::new(schema),
947            }
948        },
949        DslPlan::Sink { input, payload } => {
950            let input =
951                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
952            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
953            let payload = match payload {
954                SinkType::Memory => SinkTypeIR::Memory,
955                SinkType::File(f) => SinkTypeIR::File(f),
956                SinkType::Partition(f) => SinkTypeIR::Partition(PartitionSinkTypeIR {
957                    base_path: f.base_path,
958                    file_path_cb: f.file_path_cb,
959                    file_type: f.file_type,
960                    sink_options: f.sink_options,
961                    variant: match f.variant {
962                        PartitionVariant::MaxSize(max_size) => {
963                            PartitionVariantIR::MaxSize(max_size)
964                        },
965                        PartitionVariant::Parted {
966                            key_exprs,
967                            include_key,
968                        } => {
969                            let eirs = to_expr_irs(
970                                key_exprs,
971                                &mut ExprToIRContext::new_with_opt_eager(
972                                    ctxt.expr_arena,
973                                    &input_schema,
974                                    ctxt.opt_flags,
975                                ),
976                            )?;
977                            ctxt.conversion_optimizer
978                                .fill_scratch(&eirs, ctxt.expr_arena);
979
980                            PartitionVariantIR::Parted {
981                                key_exprs: eirs,
982                                include_key,
983                            }
984                        },
985                        PartitionVariant::ByKey {
986                            key_exprs,
987                            include_key,
988                        } => {
989                            let eirs = to_expr_irs(
990                                key_exprs,
991                                &mut ExprToIRContext::new_with_opt_eager(
992                                    ctxt.expr_arena,
993                                    &input_schema,
994                                    ctxt.opt_flags,
995                                ),
996                            )?;
997                            ctxt.conversion_optimizer
998                                .fill_scratch(&eirs, ctxt.expr_arena);
999
1000                            PartitionVariantIR::ByKey {
1001                                key_exprs: eirs,
1002                                include_key,
1003                            }
1004                        },
1005                    },
1006                    cloud_options: f.cloud_options,
1007                    per_partition_sort_by: match f.per_partition_sort_by {
1008                        None => None,
1009                        Some(sort_by) => Some(
1010                            sort_by
1011                                .into_iter()
1012                                .map(|s| {
1013                                    let expr = to_expr_ir(
1014                                        s.expr,
1015                                        &mut ExprToIRContext::new_with_opt_eager(
1016                                            ctxt.expr_arena,
1017                                            &input_schema,
1018                                            ctxt.opt_flags,
1019                                        ),
1020                                    )?;
1021                                    ctxt.conversion_optimizer
1022                                        .push_scratch(expr.node(), ctxt.expr_arena);
1023                                    Ok(SortColumnIR {
1024                                        expr,
1025                                        descending: s.descending,
1026                                        nulls_last: s.nulls_last,
1027                                    })
1028                                })
1029                                .collect::<PolarsResult<Vec<_>>>()?,
1030                        ),
1031                    },
1032                    finish_callback: f.finish_callback,
1033                }),
1034            };
1035
1036            let lp = IR::Sink { input, payload };
1037            return run_conversion(lp, ctxt, "sink");
1038        },
1039        DslPlan::SinkMultiple { inputs } => {
1040            let inputs = inputs
1041                .into_iter()
1042                .map(|lp| to_alp_impl(lp, ctxt))
1043                .collect::<PolarsResult<Vec<_>>>()
1044                .map_err(|e| e.context(failed_here!(vertical concat)))?;
1045            IR::SinkMultiple { inputs }
1046        },
1047        #[cfg(feature = "merge_sorted")]
1048        DslPlan::MergeSorted {
1049            input_left,
1050            input_right,
1051            key,
1052        } => {
1053            let input_left = to_alp_impl(owned(input_left), ctxt)
1054                .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1055            let input_right = to_alp_impl(owned(input_right), ctxt)
1056                .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1057
1058            let left_schema = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);
1059            let right_schema = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);
1060
1061            left_schema
1062                .ensure_is_exact_match(&right_schema)
1063                .map_err(|err| err.context("merge_sorted".into()))?;
1064
1065            left_schema
1066                .try_get(key.as_str())
1067                .map_err(|err| err.context("merge_sorted".into()))?;
1068
1069            IR::MergeSorted {
1070                input_left,
1071                input_right,
1072                key,
1073            }
1074        },
1075        DslPlan::IR { node, dsl, version } => {
1076            return match node {
1077                Some(node)
1078                    if version == ctxt.lp_arena.version()
1079                        && ctxt.conversion_optimizer.used_arenas.insert(version) =>
1080                {
1081                    Ok(node)
1082                },
1083                _ => to_alp_impl(owned(dsl), ctxt),
1084            };
1085        },
1086    };
1087    Ok(ctxt.lp_arena.add(v))
1088}
1089
1090fn resolve_with_columns(
1091    exprs: Vec<Expr>,
1092    input: Node,
1093    lp_arena: &Arena<IR>,
1094    expr_arena: &mut Arena<AExpr>,
1095    opt_flags: &mut OptFlags,
1096) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {
1097    let input_schema = lp_arena.get(input).schema(lp_arena);
1098    let mut output_schema = (**input_schema).clone();
1099    let exprs = rewrite_projections(exprs, &PlHashSet::new(), &input_schema, opt_flags)?;
1100    let mut output_names = PlHashSet::with_capacity(exprs.len());
1101
1102    let eirs = to_expr_irs(
1103        exprs,
1104        &mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),
1105    )?;
1106    for eir in eirs.iter() {
1107        let field = eir.field(&input_schema, expr_arena)?;
1108
1109        if !output_names.insert(field.name().clone()) {
1110            let msg = format!(
1111                "the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\
1112                    It's possible that multiple expressions are returning the same default column name. \
1113                    If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
1114                    duplicate column names.",
1115                field.name()
1116            );
1117            polars_bail!(ComputeError: msg)
1118        }
1119        output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);
1120    }
1121
1122    Ok((eirs, Arc::new(output_schema)))
1123}
1124
1125fn resolve_group_by(
1126    input: Node,
1127    keys: Vec<Expr>,
1128    aggs: Vec<Expr>,
1129    _options: &GroupbyOptions,
1130    lp_arena: &Arena<IR>,
1131    expr_arena: &mut Arena<AExpr>,
1132    opt_flags: &mut OptFlags,
1133) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {
1134    let input_schema = lp_arena.get(input).schema(lp_arena);
1135    let input_schema = input_schema.as_ref();
1136    let mut keys = rewrite_projections(keys, &PlHashSet::default(), input_schema, opt_flags)?;
1137
1138    // Initialize schema from keys
1139    let mut output_schema = expressions_to_schema(&keys, input_schema)?;
1140    let mut key_names: PlHashSet<PlSmallStr> = output_schema.iter_names().cloned().collect();
1141
1142    #[allow(unused_mut)]
1143    let mut pop_keys = false;
1144    // Add dynamic groupby index column(s)
1145    // Also add index columns to keys for expression expansion.
1146    #[cfg(feature = "dynamic_group_by")]
1147    {
1148        if let Some(options) = _options.rolling.as_ref() {
1149            let name = options.index_column.clone();
1150            let dtype = input_schema.try_get(name.as_str())?;
1151            keys.push(col(name.clone()));
1152            key_names.insert(name.clone());
1153            pop_keys = true;
1154            output_schema.with_column(name.clone(), dtype.clone());
1155        } else if let Some(options) = _options.dynamic.as_ref() {
1156            let name = options.index_column.clone();
1157            keys.push(col(name.clone()));
1158            key_names.insert(name.clone());
1159            pop_keys = true;
1160            let dtype = input_schema.try_get(name.as_str())?;
1161            if options.include_boundaries {
1162                output_schema.with_column("_lower_boundary".into(), dtype.clone());
1163                output_schema.with_column("_upper_boundary".into(), dtype.clone());
1164            }
1165            output_schema.with_column(name.clone(), dtype.clone());
1166        }
1167    }
1168    let keys_index_len = output_schema.len();
1169    if pop_keys {
1170        let _ = keys.pop();
1171    }
1172    let keys = to_expr_irs(
1173        keys,
1174        &mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1175    )?;
1176
1177    // Add aggregation column(s)
1178    let aggs = rewrite_projections(aggs, &key_names, input_schema, opt_flags)?;
1179    let aggs = to_expr_irs(
1180        aggs,
1181        &mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1182    )?;
1183    utils::validate_expressions(&keys, expr_arena, input_schema, "group by")?;
1184    utils::validate_expressions(&aggs, expr_arena, input_schema, "group by")?;
1185
1186    let mut aggs_schema = expr_irs_to_schema(&aggs, input_schema, expr_arena);
1187
1188    // Make sure aggregation columns do not contain duplicates
1189    if aggs_schema.len() < aggs.len() {
1190        let mut names = PlHashSet::with_capacity(aggs.len());
1191        for agg in aggs.iter() {
1192            let name = agg.output_name();
1193            polars_ensure!(names.insert(name.clone()), duplicate = name)
1194        }
1195    }
1196
1197    // Coerce aggregation column(s) into List unless not needed (auto-implode)
1198    debug_assert!(aggs_schema.len() == aggs.len());
1199    for ((_name, dtype), expr) in aggs_schema.iter_mut().zip(&aggs) {
1200        if !expr.is_scalar(expr_arena) {
1201            *dtype = dtype.clone().implode();
1202        }
1203    }
1204
1205    // Final output_schema
1206    output_schema.merge(aggs_schema);
1207
1208    // Make sure aggregation columns do not contain keys or index columns
1209    if output_schema.len() < (keys_index_len + aggs.len()) {
1210        let mut names = PlHashSet::with_capacity(output_schema.len());
1211        for agg in aggs.iter().chain(keys.iter()) {
1212            let name = agg.output_name();
1213            polars_ensure!(names.insert(name.clone()), duplicate = name)
1214        }
1215    }
1216
1217    Ok((keys, aggs, Arc::new(output_schema)))
1218}
1219fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>
1220where
1221    F: Fn(&DataType) -> bool,
1222    E: Fn(&PlSmallStr) -> Expr,
1223{
1224    schema
1225        .iter()
1226        .map(|(name, dt)| {
1227            if condition(dt) {
1228                expr(name)
1229            } else {
1230                lit(NULL).cast(dt.clone()).alias(name.clone())
1231            }
1232        })
1233        .collect()
1234}
1235
1236pub(crate) fn maybe_init_projection_excluding_hive(
1237    reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
1238    hive_parts: Option<&SchemaRef>,
1239) -> Option<Arc<[PlSmallStr]>> {
1240    // Update `with_columns` with a projection so that hive columns aren't loaded from the
1241    // file
1242    let hive_schema = hive_parts?;
1243
1244    match &reader_schema {
1245        Either::Left(reader_schema) => hive_schema
1246            .iter_names()
1247            .any(|x| reader_schema.contains(x))
1248            .then(|| {
1249                reader_schema
1250                    .iter_names_cloned()
1251                    .filter(|x| !hive_schema.contains(x))
1252                    .collect::<Arc<[_]>>()
1253            }),
1254        Either::Right(reader_schema) => hive_schema
1255            .iter_names()
1256            .any(|x| reader_schema.contains(x))
1257            .then(|| {
1258                reader_schema
1259                    .iter_names_cloned()
1260                    .filter(|x| !hive_schema.contains(x))
1261                    .collect::<Arc<[_]>>()
1262            }),
1263    }
1264}