polars_plan/plans/conversion/
dsl_to_ir.rs

1use arrow::datatypes::ArrowSchemaRef;
2use either::Either;
3use expr_expansion::{is_regex_projection, rewrite_projections};
4use hive::hive_partitions_from_paths;
5use polars_core::chunked_array::cast::CastOptions;
6use polars_utils::unique_id::UniqueId;
7
8use super::convert_utils::SplitPredicates;
9use super::stack_opt::ConversionOptimizer;
10use super::*;
11use crate::plans::conversion::expr_expansion::expand_selectors;
12
13fn expand_expressions(
14    input: Node,
15    exprs: Vec<Expr>,
16    lp_arena: &Arena<IR>,
17    expr_arena: &mut Arena<AExpr>,
18    opt_flags: &mut OptFlags,
19) -> PolarsResult<Vec<ExprIR>> {
20    let schema = lp_arena.get(input).schema(lp_arena);
21    let exprs = rewrite_projections(exprs, &schema, &[], opt_flags)?;
22    to_expr_irs(exprs, expr_arena, &schema)
23}
24
25fn empty_df() -> IR {
26    IR::DataFrameScan {
27        df: Arc::new(Default::default()),
28        schema: Arc::new(Default::default()),
29        output_schema: None,
30    }
31}
32
33fn validate_expression(
34    node: Node,
35    expr_arena: &Arena<AExpr>,
36    input_schema: &Schema,
37    operation_name: &str,
38) -> PolarsResult<()> {
39    let iter = aexpr_to_leaf_names_iter(node, expr_arena);
40    validate_columns_in_input(iter, input_schema, operation_name)
41}
42
43fn validate_expressions<N: Into<Node>, I: IntoIterator<Item = N>>(
44    nodes: I,
45    expr_arena: &Arena<AExpr>,
46    input_schema: &Schema,
47    operation_name: &str,
48) -> PolarsResult<()> {
49    let nodes = nodes.into_iter();
50
51    for node in nodes {
52        validate_expression(node.into(), expr_arena, input_schema, operation_name)?
53    }
54    Ok(())
55}
56
57macro_rules! failed_here {
58    ($($t:tt)*) => {
59        format!("'{}'", stringify!($($t)*)).into()
60    }
61}
62pub(super) use failed_here;
63
64pub fn to_alp(
65    lp: DslPlan,
66    expr_arena: &mut Arena<AExpr>,
67    lp_arena: &mut Arena<IR>,
68    // Only `SIMPLIFY_EXPR`, `TYPE_COERCION`, `TYPE_CHECK` are respected.
69    opt_flags: &mut OptFlags,
70) -> PolarsResult<Node> {
71    let conversion_optimizer = ConversionOptimizer::new(
72        opt_flags.contains(OptFlags::SIMPLIFY_EXPR),
73        opt_flags.contains(OptFlags::TYPE_COERCION),
74        opt_flags.contains(OptFlags::TYPE_CHECK),
75    );
76
77    let mut ctxt = DslConversionContext {
78        expr_arena,
79        lp_arena,
80        conversion_optimizer,
81        opt_flags,
82        nodes_scratch: &mut unitvec![],
83        pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),
84    };
85
86    match to_alp_impl(lp, &mut ctxt) {
87        Ok(out) => Ok(out),
88        Err(err) => {
89            if opt_flags.contains(OptFlags::EAGER) {
90                // If we dispatched to the lazy engine from the eager API, we don't want to resolve
91                // where in the query plan it went wrong. It is clear from the backtrace anyway.
92                return Err(err.remove_context());
93            };
94
95            let Some(ir_until_then) = lp_arena.last_node() else {
96                return Err(err);
97            };
98
99            let node_name = if let PolarsError::Context { msg, .. } = &err {
100                msg
101            } else {
102                "THIS_NODE"
103            };
104            let plan = IRPlan::new(
105                ir_until_then,
106                std::mem::take(lp_arena),
107                std::mem::take(expr_arena),
108            );
109            let location = format!("{}", plan.display());
110            Err(err.wrap_msg(|msg| {
111                format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")
112            }))
113        },
114    }
115}
116
117pub(super) struct DslConversionContext<'a> {
118    pub(super) expr_arena: &'a mut Arena<AExpr>,
119    pub(super) lp_arena: &'a mut Arena<IR>,
120    pub(super) conversion_optimizer: ConversionOptimizer,
121    pub(super) opt_flags: &'a mut OptFlags,
122    pub(super) nodes_scratch: &'a mut UnitVec<Node>,
123    pub(super) pushdown_maintain_errors: bool,
124}
125
126pub(super) fn run_conversion(
127    lp: IR,
128    ctxt: &mut DslConversionContext,
129    name: &str,
130) -> PolarsResult<Node> {
131    let lp_node = ctxt.lp_arena.add(lp);
132    ctxt.conversion_optimizer
133        .optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node)
134        .map_err(|e| e.context(format!("'{name}' failed").into()))?;
135
136    Ok(lp_node)
137}
138
139/// converts LogicalPlan to IR
140/// it adds expressions & lps to the respective arenas as it traverses the plan
141/// finally it returns the top node of the logical plan
142#[recursive]
143pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
144    let owned = Arc::unwrap_or_clone;
145
146    let v = match lp {
147        DslPlan::Scan {
148            sources,
149            file_info,
150            unified_scan_args: mut unified_scan_args_box,
151            scan_type,
152            cached_ir,
153        } => {
154            // Note that the first metadata can still end up being `None` later if the files were
155            // filtered from predicate pushdown.
156            let mut cached_ir = cached_ir.lock().unwrap();
157
158            if cached_ir.is_none() {
159                let cloud_options = unified_scan_args_box.cloud_options.clone();
160                let cloud_options = cloud_options.as_ref();
161
162                let unified_scan_args = unified_scan_args_box.as_mut();
163                let mut scan_type = scan_type.clone();
164
165                if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {
166                    match unified_scan_args.hive_options.enabled {
167                        // Enable hive_partitioning if it is unspecified but a non-empty hive_schema given
168                        None if !hive_schema.is_empty() => {
169                            unified_scan_args.hive_options.enabled = Some(true)
170                        },
171                        // hive_partitioning was explicitly disabled
172                        Some(false) => polars_bail!(
173                            ComputeError:
174                            "a hive schema was given but hive_partitioning was disabled"
175                        ),
176                        Some(true) | None => {},
177                    }
178                }
179
180                let sources =
181                    match &*scan_type {
182                        #[cfg(feature = "parquet")]
183                        FileScan::Parquet { .. } => sources
184                            .expand_paths_with_hive_update(unified_scan_args, cloud_options)?,
185                        #[cfg(feature = "ipc")]
186                        FileScan::Ipc { .. } => sources
187                            .expand_paths_with_hive_update(unified_scan_args, cloud_options)?,
188                        #[cfg(feature = "csv")]
189                        FileScan::Csv { .. } => {
190                            sources.expand_paths(unified_scan_args, cloud_options)?
191                        },
192                        #[cfg(feature = "json")]
193                        FileScan::NDJson { .. } => {
194                            sources.expand_paths(unified_scan_args, cloud_options)?
195                        },
196                        #[cfg(feature = "python")]
197                        FileScan::PythonDataset { .. } => {
198                            // There are a lot of places that short-circuit if the paths is empty,
199                            // so we just give a dummy path here.
200                            ScanSources::Paths(Arc::from(["dummy".into()]))
201                        },
202                        FileScan::Anonymous { .. } => sources,
203                    };
204
205                let mut file_info = match &mut *scan_type {
206                    #[cfg(feature = "parquet")]
207                    FileScan::Parquet { options, metadata } => {
208                        if let Some(schema) = &options.schema {
209                            // We were passed a schema, we don't have to call `parquet_file_info`,
210                            // but this does mean we don't have `row_estimation` and `first_metadata`.
211                            FileInfo {
212                                schema: schema.clone(),
213                                reader_schema: Some(either::Either::Left(Arc::new(
214                                    schema.to_arrow(CompatLevel::newest()),
215                                ))),
216                                row_estimation: (None, 0),
217                            }
218                        } else {
219                            let (file_info, md) = scans::parquet_file_info(
220                                &sources,
221                                unified_scan_args.row_index.as_ref(),
222                                cloud_options,
223                            )
224                            .map_err(|e| e.context(failed_here!(parquet scan)))?;
225
226                            *metadata = md;
227                            file_info
228                        }
229                    },
230                    #[cfg(feature = "ipc")]
231                    FileScan::Ipc { metadata, .. } => {
232                        let (file_info, md) = scans::ipc_file_info(
233                            &sources,
234                            unified_scan_args.row_index.as_ref(),
235                            cloud_options,
236                        )
237                        .map_err(|e| e.context(failed_here!(ipc scan)))?;
238                        *metadata = Some(Arc::new(md));
239                        file_info
240                    },
241                    #[cfg(feature = "csv")]
242                    FileScan::Csv { options } => {
243                        // TODO: This is a hack. We conditionally set `allow_missing_columns` to
244                        // mimic existing behavior, but this should be taken from a user provided
245                        // parameter instead.
246                        if options.schema.is_some() && options.has_header {
247                            unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;
248                        }
249
250                        scans::csv_file_info(
251                            &sources,
252                            unified_scan_args.row_index.as_ref(),
253                            options,
254                            cloud_options,
255                        )
256                        .map_err(|e| e.context(failed_here!(csv scan)))?
257                    },
258                    #[cfg(feature = "json")]
259                    FileScan::NDJson { options } => scans::ndjson_file_info(
260                        &sources,
261                        unified_scan_args.row_index.as_ref(),
262                        options,
263                        cloud_options,
264                    )
265                    .map_err(|e| e.context(failed_here!(ndjson scan)))?,
266                    #[cfg(feature = "python")]
267                    FileScan::PythonDataset { dataset_object, .. } => {
268                        if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {
269                            polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")
270                        }
271
272                        let mut schema = dataset_object.schema()?;
273                        let reader_schema = schema.clone();
274
275                        if let Some(row_index) = &unified_scan_args.row_index {
276                            insert_row_index_to_schema(
277                                Arc::make_mut(&mut schema),
278                                row_index.name.clone(),
279                            )?;
280                        }
281
282                        FileInfo {
283                            schema,
284                            reader_schema: Some(either::Either::Right(reader_schema)),
285                            row_estimation: (None, usize::MAX),
286                        }
287                    },
288                    FileScan::Anonymous { .. } => {
289                        file_info.expect("FileInfo should be set for AnonymousScan")
290                    },
291                };
292
293                if unified_scan_args.hive_options.enabled.is_none() {
294                    // We expect this to be `Some(_)` after this point. If it hasn't been auto-enabled
295                    // we explicitly set it to disabled.
296                    unified_scan_args.hive_options.enabled = Some(false);
297                }
298
299                let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()
300                    && file_info.reader_schema.is_some()
301                {
302                    let paths = sources.as_paths().ok_or_else(|| {
303                        polars_err!(nyi = "Hive-partitioning of in-memory buffers")
304                    })?;
305
306                    #[allow(unused_assignments)]
307                    let mut owned = None;
308
309                    hive_partitions_from_paths(
310                        paths,
311                        unified_scan_args.hive_options.hive_start_idx,
312                        unified_scan_args.hive_options.schema.clone(),
313                        match file_info.reader_schema.as_ref().unwrap() {
314                            Either::Left(v) => {
315                                owned = Some(Schema::from_arrow_schema(v.as_ref()));
316                                owned.as_ref().unwrap()
317                            },
318                            Either::Right(v) => v.as_ref(),
319                        },
320                        unified_scan_args.hive_options.try_parse_dates,
321                    )?
322                } else {
323                    None
324                };
325
326                if let Some(ref hive_parts) = hive_parts {
327                    let hive_schema = hive_parts.schema();
328                    file_info.update_schema_with_hive_schema(hive_schema.clone());
329                } else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {
330                    // We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file
331                    // list during path expansion. In this case we still want to return an empty DataFrame with this
332                    // schema.
333                    file_info.update_schema_with_hive_schema(hive_schema);
334                }
335
336                if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
337                    let schema = Arc::make_mut(&mut file_info.schema);
338
339                    if schema.contains(file_path_col) {
340                        polars_bail!(
341                            Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
342                            file_path_col
343                        );
344                    }
345
346                    schema.insert_at_index(
347                        schema.len(),
348                        file_path_col.clone(),
349                        DataType::String,
350                    )?;
351                }
352
353                unified_scan_args.projection = if file_info.reader_schema.is_some() {
354                    maybe_init_projection_excluding_hive(
355                        file_info.reader_schema.as_ref().unwrap(),
356                        hive_parts.as_ref().map(|h| h.schema()),
357                    )
358                } else {
359                    None
360                };
361
362                if let Some(row_index) = &unified_scan_args.row_index {
363                    let schema = Arc::make_mut(&mut file_info.schema);
364                    *schema = schema
365                        .new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)
366                        .unwrap();
367                }
368
369                let ir = if sources.is_empty() && !matches!(&*scan_type, FileScan::Anonymous { .. })
370                {
371                    IR::DataFrameScan {
372                        df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),
373                        schema: file_info.schema,
374                        output_schema: None,
375                    }
376                } else {
377                    let unified_scan_args = unified_scan_args_box;
378
379                    IR::Scan {
380                        sources,
381                        file_info,
382                        hive_parts,
383                        predicate: None,
384                        scan_type,
385                        output_schema: None,
386                        unified_scan_args,
387                        id: Default::default(),
388                    }
389                };
390
391                cached_ir.replace(ir);
392            }
393
394            cached_ir.clone().unwrap()
395        },
396        #[cfg(feature = "python")]
397        DslPlan::PythonScan { mut options } => {
398            let scan_fn = options.scan_fn.take();
399            let schema = options.get_schema()?;
400            IR::PythonScan {
401                options: PythonOptions {
402                    scan_fn,
403                    schema,
404                    python_source: options.python_source,
405                    validate_schema: options.validate_schema,
406                    output_schema: Default::default(),
407                    with_columns: Default::default(),
408                    n_rows: Default::default(),
409                    predicate: Default::default(),
410                },
411            }
412        },
413        DslPlan::Union { inputs, args } => {
414            let mut inputs = inputs
415                .into_iter()
416                .map(|lp| to_alp_impl(lp, ctxt))
417                .collect::<PolarsResult<Vec<_>>>()
418                .map_err(|e| e.context(failed_here!(vertical concat)))?;
419
420            if args.diagonal {
421                inputs =
422                    convert_utils::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;
423            }
424
425            if args.to_supertypes {
426                convert_utils::convert_st_union(&mut inputs, ctxt.lp_arena, ctxt.expr_arena)
427                    .map_err(|e| e.context(failed_here!(vertical concat)))?;
428            }
429
430            let first = *inputs.first().ok_or_else(
431                || polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),
432            )?;
433            let schema = ctxt.lp_arena.get(first).schema(ctxt.lp_arena);
434            for n in &inputs[1..] {
435                let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);
436                // The first argument
437                schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation:  "'union'/'concat' inputs should all have the same schema,\
438                    got\n{:?} and \n{:?}", schema, schema_i)
439                )?;
440            }
441
442            let options = args.into();
443            IR::Union { inputs, options }
444        },
445        DslPlan::HConcat { inputs, options } => {
446            let inputs = inputs
447                .into_iter()
448                .map(|lp| to_alp_impl(lp, ctxt))
449                .collect::<PolarsResult<Vec<_>>>()
450                .map_err(|e| e.context(failed_here!(horizontal concat)))?;
451
452            let schema = convert_utils::h_concat_schema(&inputs, ctxt.lp_arena)?;
453
454            IR::HConcat {
455                inputs,
456                schema,
457                options,
458            }
459        },
460        DslPlan::Filter { input, predicate } => {
461            let mut input =
462                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;
463            let schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
464            let predicate = expand_filter(predicate, input, ctxt.lp_arena, ctxt.opt_flags)
465                .map_err(|e| e.context(failed_here!(filter)))?;
466
467            let predicate_ae = to_expr_ir(predicate.clone(), ctxt.expr_arena, &schema)?;
468
469            if ctxt.opt_flags.predicate_pushdown() {
470                ctxt.nodes_scratch.clear();
471
472                if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(
473                    predicate_ae.node(),
474                    ctxt.expr_arena,
475                    Some(ctxt.nodes_scratch),
476                    ctxt.pushdown_maintain_errors,
477                ) {
478                    let mut update_input = |predicate: Node| -> PolarsResult<()> {
479                        let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
480                        ctxt.conversion_optimizer
481                            .push_scratch(predicate.node(), ctxt.expr_arena);
482                        let lp = IR::Filter { input, predicate };
483                        input = run_conversion(lp, ctxt, "filter")?;
484
485                        Ok(())
486                    };
487
488                    // Pushables first, then fallible.
489
490                    for predicate in pushable {
491                        update_input(predicate)?;
492                    }
493
494                    if let Some(node) = fallible {
495                        update_input(node)?;
496                    }
497
498                    return Ok(input);
499                };
500            };
501
502            ctxt.conversion_optimizer
503                .push_scratch(predicate_ae.node(), ctxt.expr_arena);
504            let lp = IR::Filter {
505                input,
506                predicate: predicate_ae,
507            };
508            return run_conversion(lp, ctxt, "filter");
509        },
510        DslPlan::Slice { input, offset, len } => {
511            let input =
512                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;
513            IR::Slice { input, offset, len }
514        },
515        DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
516            df,
517            schema,
518            output_schema: None,
519        },
520        DslPlan::Select {
521            expr,
522            input,
523            options,
524        } => {
525            let input =
526                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
527            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
528            let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)
529                .map_err(|e| e.context(failed_here!(select)))?;
530
531            if exprs.is_empty() {
532                ctxt.lp_arena.replace(input, empty_df());
533                return Ok(input);
534            }
535
536            let eirs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
537            ctxt.conversion_optimizer
538                .fill_scratch(&eirs, ctxt.expr_arena);
539
540            let schema = Arc::new(schema);
541            let lp = IR::Select {
542                expr: eirs,
543                input,
544                schema,
545                options,
546            };
547
548            return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));
549        },
550        DslPlan::Sort {
551            input,
552            by_column,
553            slice,
554            mut sort_options,
555        } => {
556            // note: if given an Expr::Columns, count the individual cols
557            let n_by_exprs = if by_column.len() == 1 {
558                match &by_column[0] {
559                    Expr::Columns(cols) => cols.len(),
560                    _ => 1,
561                }
562            } else {
563                by_column.len()
564            };
565            let n_desc = sort_options.descending.len();
566            polars_ensure!(
567                n_desc == n_by_exprs || n_desc == 1,
568                ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()
569            );
570            let n_nulls_last = sort_options.nulls_last.len();
571            polars_ensure!(
572                n_nulls_last == n_by_exprs || n_nulls_last == 1,
573                ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()
574            );
575
576            let input =
577                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sort)))?;
578
579            let mut expanded_cols = Vec::new();
580            let mut nulls_last = Vec::new();
581            let mut descending = Vec::new();
582
583            // note: nulls_last/descending need to be matched to expanded multi-output expressions.
584            // when one of nulls_last/descending has not been updated from the default (single
585            // value true/false), 'cycle' ensures that "by_column" iter is not truncated.
586            for (c, (&n, &d)) in by_column.into_iter().zip(
587                sort_options
588                    .nulls_last
589                    .iter()
590                    .cycle()
591                    .zip(sort_options.descending.iter().cycle()),
592            ) {
593                let exprs = expand_expressions(
594                    input,
595                    vec![c],
596                    ctxt.lp_arena,
597                    ctxt.expr_arena,
598                    ctxt.opt_flags,
599                )
600                .map_err(|e| e.context(failed_here!(sort)))?;
601
602                nulls_last.extend(std::iter::repeat_n(n, exprs.len()));
603                descending.extend(std::iter::repeat_n(d, exprs.len()));
604                expanded_cols.extend(exprs);
605            }
606            sort_options.nulls_last = nulls_last;
607            sort_options.descending = descending;
608
609            ctxt.conversion_optimizer
610                .fill_scratch(&expanded_cols, ctxt.expr_arena);
611            let mut by_column = expanded_cols;
612
613            // Remove null columns in multi-columns sort
614            if by_column.len() > 1 {
615                let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
616
617                let mut null_columns = vec![];
618
619                for (i, c) in by_column.iter().enumerate() {
620                    if let DataType::Null =
621                        c.dtype(&input_schema, Context::Default, ctxt.expr_arena)?
622                    {
623                        null_columns.push(i);
624                    }
625                }
626                // All null columns, only take one.
627                if null_columns.len() == by_column.len() {
628                    by_column.truncate(1);
629                    sort_options.nulls_last.truncate(1);
630                    sort_options.descending.truncate(1);
631                }
632                // Remove the null columns
633                else if !null_columns.is_empty() {
634                    for i in null_columns.into_iter().rev() {
635                        by_column.remove(i);
636                        sort_options.nulls_last.remove(i);
637                        sort_options.descending.remove(i);
638                    }
639                }
640            };
641
642            let lp = IR::Sort {
643                input,
644                by_column,
645                slice,
646                sort_options,
647            };
648
649            return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));
650        },
651        DslPlan::Cache { input } => {
652            let id = UniqueId::from_arc(input.clone());
653            let input =
654                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(cache)))?;
655            IR::Cache {
656                input,
657                id,
658                cache_hits: crate::constants::UNLIMITED_CACHE,
659            }
660        },
661        DslPlan::GroupBy {
662            input,
663            keys,
664            aggs,
665            apply,
666            maintain_order,
667            options,
668        } => {
669            let input =
670                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;
671
672            // Rolling + group-by sorts the whole table, so remove unneeded columns
673            if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {
674                ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)
675            }
676
677            let (keys, aggs, schema) = resolve_group_by(
678                input,
679                keys,
680                aggs,
681                &options,
682                ctxt.lp_arena,
683                ctxt.expr_arena,
684                ctxt.opt_flags,
685            )
686            .map_err(|e| e.context(failed_here!(group_by)))?;
687
688            let (apply, schema) = if let Some((apply, schema)) = apply {
689                (Some(apply), schema)
690            } else {
691                (None, schema)
692            };
693
694            ctxt.conversion_optimizer
695                .fill_scratch(&keys, ctxt.expr_arena);
696            ctxt.conversion_optimizer
697                .fill_scratch(&aggs, ctxt.expr_arena);
698
699            let lp = IR::GroupBy {
700                input,
701                keys,
702                aggs,
703                schema,
704                apply,
705                maintain_order,
706                options,
707            };
708
709            return run_conversion(lp, ctxt, "group_by")
710                .map_err(|e| e.context(failed_here!(group_by)));
711        },
712        DslPlan::Join {
713            input_left,
714            input_right,
715            left_on,
716            right_on,
717            predicates,
718            options,
719        } => {
720            return join::resolve_join(
721                Either::Left(input_left),
722                Either::Left(input_right),
723                left_on,
724                right_on,
725                predicates,
726                JoinOptionsIR::from(Arc::unwrap_or_clone(options)),
727                ctxt,
728            )
729            .map_err(|e| e.context(failed_here!(join)))
730            .map(|t| t.0);
731        },
732        DslPlan::HStack {
733            input,
734            exprs,
735            options,
736        } => {
737            let input = to_alp_impl(owned(input), ctxt)
738                .map_err(|e| e.context(failed_here!(with_columns)))?;
739            let (exprs, schema) =
740                resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)
741                    .map_err(|e| e.context(failed_here!(with_columns)))?;
742
743            ctxt.conversion_optimizer
744                .fill_scratch(&exprs, ctxt.expr_arena);
745            let lp = IR::HStack {
746                input,
747                exprs,
748                schema,
749                options,
750            };
751            return run_conversion(lp, ctxt, "with_columns");
752        },
753        DslPlan::MatchToSchema {
754            input,
755            match_schema,
756            per_column,
757            extra_columns,
758        } => {
759            let input =
760                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
761            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
762
763            assert_eq!(per_column.len(), match_schema.len());
764
765            if input_schema.as_ref() == &match_schema {
766                return Ok(input);
767            }
768
769            let mut exprs = Vec::with_capacity(match_schema.len());
770            let mut found_missing_columns = Vec::new();
771            let mut used_input_columns = 0;
772
773            for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {
774                match input_schema.get(column) {
775                    None => match &per_column.missing_columns {
776                        MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),
777                        MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(
778                            Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(
779                                dtype.clone(),
780                            )))),
781                            column.clone(),
782                        )),
783                        MissingColumnsPolicyOrExpr::InsertWith(expr) => {
784                            exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))
785                        },
786                    },
787                    Some(input_dtype) if dtype == input_dtype => {
788                        used_input_columns += 1;
789                        exprs.push(Expr::Column(column.clone()))
790                    },
791                    Some(input_dtype) => {
792                        let from_dtype = input_dtype;
793                        let to_dtype = dtype;
794
795                        let policy = CastColumnsPolicy {
796                            integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,
797                            float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,
798                            float_downcast: false,
799                            datetime_nanoseconds_downcast: false,
800                            datetime_microseconds_downcast: false,
801                            datetime_convert_timezone: false,
802                            missing_struct_fields: per_column.missing_struct_fields,
803                            extra_struct_fields: per_column.extra_struct_fields,
804                        };
805
806                        let should_cast =
807                            policy.should_cast_column(column, to_dtype, from_dtype)?;
808
809                        let mut expr = Expr::Column(PlSmallStr::from_str(column));
810                        if should_cast {
811                            expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);
812                        }
813
814                        used_input_columns += 1;
815                        exprs.push(expr);
816                    },
817                }
818            }
819
820            // Report the error for missing columns
821            if let Some(lst) = found_missing_columns.first() {
822                use std::fmt::Write;
823                let mut formatted = String::new();
824                write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();
825                for c in &found_missing_columns[1..] {
826                    write!(&mut formatted, ", \"{c}\"").unwrap();
827                }
828
829                write!(&mut formatted, "\"{lst}\"").unwrap();
830                polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");
831            }
832
833            // Report the error for extra columns
834            if used_input_columns != input_schema.len()
835                && extra_columns == ExtraColumnsPolicy::Raise
836            {
837                let found_extra_columns = input_schema
838                    .iter_names()
839                    .filter(|n| !match_schema.contains(n))
840                    .collect::<Vec<_>>();
841
842                use std::fmt::Write;
843                let mut formatted = String::new();
844                write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();
845                for c in &found_extra_columns[1..] {
846                    write!(&mut formatted, ", \"{c}\"").unwrap();
847                }
848
849                polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");
850            }
851
852            let exprs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
853
854            ctxt.conversion_optimizer
855                .fill_scratch(&exprs, ctxt.expr_arena);
856            let lp = IR::Select {
857                input,
858                expr: exprs,
859                schema: match_schema.clone(),
860                options: ProjectionOptions {
861                    run_parallel: true,
862                    duplicate_check: false,
863                    should_broadcast: true,
864                },
865            };
866            return run_conversion(lp, ctxt, "match_to_schema");
867        },
868        DslPlan::Distinct { input, options } => {
869            let input =
870                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
871            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
872
873            let subset = options
874                .subset
875                .map(|s| {
876                    let cols = expand_selectors(s, input_schema.as_ref(), &[])?;
877
878                    // Checking if subset columns exist in the dataframe
879                    for col in cols.iter() {
880                        let _ = input_schema
881                            .try_get(col)
882                            .map_err(|_| polars_err!(col_not_found = col))?;
883                    }
884
885                    Ok::<_, PolarsError>(cols)
886                })
887                .transpose()?;
888
889            let options = DistinctOptionsIR {
890                subset,
891                maintain_order: options.maintain_order,
892                keep_strategy: options.keep_strategy,
893                slice: None,
894            };
895
896            IR::Distinct { input, options }
897        },
898        DslPlan::MapFunction { input, function } => {
899            let input = to_alp_impl(owned(input), ctxt)
900                .map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;
901            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
902
903            match function {
904                DslFunction::Explode {
905                    columns,
906                    allow_empty,
907                } => {
908                    let columns = expand_selectors(columns, &input_schema, &[])?;
909                    validate_columns_in_input(columns.as_ref(), &input_schema, "explode")?;
910                    polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");
911                    if columns.is_empty() {
912                        return Ok(input);
913                    }
914                    let function = FunctionIR::Explode {
915                        columns,
916                        schema: Default::default(),
917                    };
918                    let ir = IR::MapFunction { input, function };
919                    return Ok(ctxt.lp_arena.add(ir));
920                },
921                DslFunction::FillNan(fill_value) => {
922                    let exprs = input_schema
923                        .iter()
924                        .filter_map(|(name, dtype)| match dtype {
925                            DataType::Float32 | DataType::Float64 => Some(
926                                col(name.clone())
927                                    .fill_nan(fill_value.clone())
928                                    .alias(name.clone()),
929                            ),
930                            _ => None,
931                        })
932                        .collect::<Vec<_>>();
933
934                    let (exprs, schema) = resolve_with_columns(
935                        exprs,
936                        input,
937                        ctxt.lp_arena,
938                        ctxt.expr_arena,
939                        ctxt.opt_flags,
940                    )
941                    .map_err(|e| e.context(failed_here!(fill_nan)))?;
942
943                    ctxt.conversion_optimizer
944                        .fill_scratch(&exprs, ctxt.expr_arena);
945
946                    let lp = IR::HStack {
947                        input,
948                        exprs,
949                        schema,
950                        options: ProjectionOptions {
951                            duplicate_check: false,
952                            ..Default::default()
953                        },
954                    };
955                    return run_conversion(lp, ctxt, "fill_nan");
956                },
957                DslFunction::Drop(DropFunction { to_drop, strict }) => {
958                    let to_drop = expand_selectors(to_drop, &input_schema, &[])?;
959                    let to_drop = to_drop.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>();
960
961                    if strict {
962                        for col_name in to_drop.iter() {
963                            polars_ensure!(
964                                input_schema.contains(col_name),
965                                col_not_found = col_name
966                            );
967                        }
968                    }
969
970                    let mut output_schema =
971                        Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len()));
972
973                    for (col_name, dtype) in input_schema.iter() {
974                        if !to_drop.contains(col_name.as_str()) {
975                            output_schema.with_column(col_name.clone(), dtype.clone());
976                        }
977                    }
978
979                    if output_schema.is_empty() {
980                        ctxt.lp_arena.replace(input, empty_df());
981                    }
982
983                    IR::SimpleProjection {
984                        input,
985                        columns: Arc::new(output_schema),
986                    }
987                },
988                DslFunction::Stats(sf) => {
989                    let exprs = match sf {
990                        StatsFunction::Var { ddof } => stats_helper(
991                            |dt| dt.is_primitive_numeric() || dt.is_bool(),
992                            |name| col(name.clone()).var(ddof),
993                            &input_schema,
994                        ),
995                        StatsFunction::Std { ddof } => stats_helper(
996                            |dt| dt.is_primitive_numeric() || dt.is_bool(),
997                            |name| col(name.clone()).std(ddof),
998                            &input_schema,
999                        ),
1000                        StatsFunction::Quantile { quantile, method } => stats_helper(
1001                            |dt| dt.is_primitive_numeric(),
1002                            |name| col(name.clone()).quantile(quantile.clone(), method),
1003                            &input_schema,
1004                        ),
1005                        StatsFunction::Mean => stats_helper(
1006                            |dt| {
1007                                dt.is_primitive_numeric()
1008                                    || dt.is_temporal()
1009                                    || dt == &DataType::Boolean
1010                            },
1011                            |name| col(name.clone()).mean(),
1012                            &input_schema,
1013                        ),
1014                        StatsFunction::Sum => stats_helper(
1015                            |dt| {
1016                                dt.is_primitive_numeric()
1017                                    || dt.is_decimal()
1018                                    || matches!(dt, DataType::Boolean | DataType::Duration(_))
1019                            },
1020                            |name| col(name.clone()).sum(),
1021                            &input_schema,
1022                        ),
1023                        StatsFunction::Min => stats_helper(
1024                            |dt| dt.is_ord(),
1025                            |name| col(name.clone()).min(),
1026                            &input_schema,
1027                        ),
1028                        StatsFunction::Max => stats_helper(
1029                            |dt| dt.is_ord(),
1030                            |name| col(name.clone()).max(),
1031                            &input_schema,
1032                        ),
1033                        StatsFunction::Median => stats_helper(
1034                            |dt| {
1035                                dt.is_primitive_numeric()
1036                                    || dt.is_temporal()
1037                                    || dt == &DataType::Boolean
1038                            },
1039                            |name| col(name.clone()).median(),
1040                            &input_schema,
1041                        ),
1042                    };
1043                    let schema = Arc::new(expressions_to_schema(
1044                        &exprs,
1045                        &input_schema,
1046                        Context::Default,
1047                    )?);
1048                    let eirs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
1049
1050                    ctxt.conversion_optimizer
1051                        .fill_scratch(&eirs, ctxt.expr_arena);
1052
1053                    let lp = IR::Select {
1054                        input,
1055                        expr: eirs,
1056                        schema,
1057                        options: ProjectionOptions {
1058                            duplicate_check: false,
1059                            ..Default::default()
1060                        },
1061                    };
1062                    return run_conversion(lp, ctxt, "stats");
1063                },
1064                DslFunction::Rename {
1065                    existing,
1066                    new,
1067                    strict,
1068                } => {
1069                    assert_eq!(existing.len(), new.len());
1070                    if existing.is_empty() {
1071                        return Ok(input);
1072                    }
1073
1074                    let existing_lut =
1075                        PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));
1076
1077                    let mut schema = Schema::with_capacity(input_schema.len());
1078                    let mut num_replaced = 0;
1079
1080                    // Turn the rename into a select.
1081                    let expr = input_schema
1082                        .iter()
1083                        .map(|(n, dtype)| {
1084                            Ok(match existing_lut.get_index_of(n.as_str()) {
1085                                None => {
1086                                    schema.try_insert(n.clone(), dtype.clone())?;
1087                                    Expr::Column(n.clone())
1088                                },
1089                                Some(i) => {
1090                                    num_replaced += 1;
1091                                    schema.try_insert(new[i].clone(), dtype.clone())?;
1092                                    Expr::Column(n.clone()).alias(new[i].clone())
1093                                },
1094                            })
1095                        })
1096                        .collect::<PolarsResult<Vec<_>>>()?;
1097
1098                    if strict && num_replaced != existing.len() {
1099                        let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();
1100                        polars_bail!(col_not_found = col);
1101                    }
1102
1103                    // Nothing changed, make into a no-op.
1104                    if num_replaced == 0 {
1105                        return Ok(input);
1106                    }
1107
1108                    let expr = to_expr_irs(expr, ctxt.expr_arena, &input_schema)?;
1109                    ctxt.conversion_optimizer
1110                        .fill_scratch(&expr, ctxt.expr_arena);
1111
1112                    IR::Select {
1113                        input,
1114                        expr,
1115                        schema: Arc::new(schema),
1116                        options: ProjectionOptions {
1117                            run_parallel: false,
1118                            duplicate_check: false,
1119                            should_broadcast: false,
1120                        },
1121                    }
1122                },
1123                _ => {
1124                    let function = function.into_function_ir(&input_schema)?;
1125                    IR::MapFunction { input, function }
1126                },
1127            }
1128        },
1129        DslPlan::ExtContext { input, contexts } => {
1130            let input = to_alp_impl(owned(input), ctxt)
1131                .map_err(|e| e.context(failed_here!(with_context)))?;
1132            let contexts = contexts
1133                .into_iter()
1134                .map(|lp| to_alp_impl(lp, ctxt))
1135                .collect::<PolarsResult<Vec<_>>>()
1136                .map_err(|e| e.context(failed_here!(with_context)))?;
1137
1138            let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();
1139            for input in &contexts {
1140                let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);
1141                for fld in other_schema.iter_fields() {
1142                    if schema.get(fld.name()).is_none() {
1143                        schema.with_column(fld.name, fld.dtype);
1144                    }
1145                }
1146            }
1147
1148            IR::ExtContext {
1149                input,
1150                contexts,
1151                schema: Arc::new(schema),
1152            }
1153        },
1154        DslPlan::Sink { input, payload } => {
1155            let input =
1156                to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
1157            let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
1158            let payload = match payload {
1159                SinkType::Memory => SinkTypeIR::Memory,
1160                SinkType::File(f) => SinkTypeIR::File(f),
1161                SinkType::Partition(f) => SinkTypeIR::Partition(PartitionSinkTypeIR {
1162                    base_path: f.base_path,
1163                    file_path_cb: f.file_path_cb,
1164                    file_type: f.file_type,
1165                    sink_options: f.sink_options,
1166                    variant: match f.variant {
1167                        PartitionVariant::MaxSize(max_size) => {
1168                            PartitionVariantIR::MaxSize(max_size)
1169                        },
1170                        PartitionVariant::Parted {
1171                            key_exprs,
1172                            include_key,
1173                        } => {
1174                            let eirs = to_expr_irs(key_exprs, ctxt.expr_arena, &input_schema)?;
1175                            ctxt.conversion_optimizer
1176                                .fill_scratch(&eirs, ctxt.expr_arena);
1177
1178                            PartitionVariantIR::Parted {
1179                                key_exprs: eirs,
1180                                include_key,
1181                            }
1182                        },
1183                        PartitionVariant::ByKey {
1184                            key_exprs,
1185                            include_key,
1186                        } => {
1187                            let eirs = to_expr_irs(key_exprs, ctxt.expr_arena, &input_schema)?;
1188                            ctxt.conversion_optimizer
1189                                .fill_scratch(&eirs, ctxt.expr_arena);
1190
1191                            PartitionVariantIR::ByKey {
1192                                key_exprs: eirs,
1193                                include_key,
1194                            }
1195                        },
1196                    },
1197                    cloud_options: f.cloud_options,
1198                    per_partition_sort_by: match f.per_partition_sort_by {
1199                        None => None,
1200                        Some(sort_by) => Some(
1201                            sort_by
1202                                .into_iter()
1203                                .map(|s| {
1204                                    let expr = to_expr_ir(s.expr, ctxt.expr_arena, &input_schema)?;
1205                                    ctxt.conversion_optimizer
1206                                        .push_scratch(expr.node(), ctxt.expr_arena);
1207                                    Ok(SortColumnIR {
1208                                        expr,
1209                                        descending: s.descending,
1210                                        nulls_last: s.nulls_last,
1211                                    })
1212                                })
1213                                .collect::<PolarsResult<Vec<_>>>()?,
1214                        ),
1215                    },
1216                    finish_callback: f.finish_callback,
1217                }),
1218            };
1219
1220            let lp = IR::Sink { input, payload };
1221            return run_conversion(lp, ctxt, "sink");
1222        },
1223        DslPlan::SinkMultiple { inputs } => {
1224            let inputs = inputs
1225                .into_iter()
1226                .map(|lp| to_alp_impl(lp, ctxt))
1227                .collect::<PolarsResult<Vec<_>>>()
1228                .map_err(|e| e.context(failed_here!(vertical concat)))?;
1229            IR::SinkMultiple { inputs }
1230        },
1231        #[cfg(feature = "merge_sorted")]
1232        DslPlan::MergeSorted {
1233            input_left,
1234            input_right,
1235            key,
1236        } => {
1237            let input_left = to_alp_impl(owned(input_left), ctxt)
1238                .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1239            let input_right = to_alp_impl(owned(input_right), ctxt)
1240                .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1241
1242            IR::MergeSorted {
1243                input_left,
1244                input_right,
1245                key,
1246            }
1247        },
1248        DslPlan::IR { node, dsl, version } => {
1249            return if node.is_some()
1250                && version == ctxt.lp_arena.version()
1251                && ctxt.conversion_optimizer.used_arenas.insert(version)
1252            {
1253                Ok(node.unwrap())
1254            } else {
1255                to_alp_impl(owned(dsl), ctxt)
1256            };
1257        },
1258    };
1259    Ok(ctxt.lp_arena.add(v))
1260}
1261
1262fn expand_filter(
1263    predicate: Expr,
1264    input: Node,
1265    lp_arena: &Arena<IR>,
1266    opt_flags: &mut OptFlags,
1267) -> PolarsResult<Expr> {
1268    let schema = lp_arena.get(input).schema(lp_arena);
1269    let predicate = if has_expr(&predicate, |e| match e {
1270        Expr::Column(name) => is_regex_projection(name),
1271        Expr::Wildcard
1272        | Expr::Selector(_)
1273        | Expr::RenameAlias { .. }
1274        | Expr::Columns(_)
1275        | Expr::DtypeColumn(_)
1276        | Expr::IndexColumn(_)
1277        | Expr::Nth(_) => true,
1278        #[cfg(feature = "dtype-struct")]
1279        Expr::Function {
1280            function: FunctionExpr::StructExpr(StructFunction::FieldByIndex(_)),
1281            ..
1282        } => true,
1283        _ => false,
1284    }) {
1285        let mut rewritten = rewrite_projections(vec![predicate], &schema, &[], opt_flags)?;
1286        match rewritten.len() {
1287            1 => {
1288                // all good
1289                rewritten.pop().unwrap()
1290            },
1291            0 => {
1292                let msg = "The predicate expanded to zero expressions. \
1293                        This may for example be caused by a regex not matching column names or \
1294                        a column dtype match not hitting any dtypes in the DataFrame";
1295                polars_bail!(ComputeError: msg);
1296            },
1297            _ => {
1298                let mut expanded = String::new();
1299                for e in rewritten.iter().take(5) {
1300                    expanded.push_str(&format!("\t{e:?},\n"))
1301                }
1302                // pop latest comma
1303                expanded.pop();
1304                if rewritten.len() > 5 {
1305                    expanded.push_str("\t...\n")
1306                }
1307
1308                let msg = if cfg!(feature = "python") {
1309                    format!(
1310                        "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
1311                            This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."
1312                    )
1313                } else {
1314                    format!(
1315                        "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
1316                            This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."
1317                    )
1318                };
1319                polars_bail!(ComputeError: msg)
1320            },
1321        }
1322    } else {
1323        predicate
1324    };
1325    expr_to_leaf_column_names_iter(&predicate)
1326        .try_for_each(|c| schema.try_index_of(&c).and(Ok(())))?;
1327
1328    Ok(predicate)
1329}
1330
1331fn resolve_with_columns(
1332    exprs: Vec<Expr>,
1333    input: Node,
1334    lp_arena: &Arena<IR>,
1335    expr_arena: &mut Arena<AExpr>,
1336    opt_flags: &mut OptFlags,
1337) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {
1338    let input_schema = lp_arena.get(input).schema(lp_arena);
1339    let mut output_schema = (**input_schema).clone();
1340    let (exprs, _) = prepare_projection(exprs, &input_schema, opt_flags)?;
1341    let mut output_names = PlHashSet::with_capacity(exprs.len());
1342
1343    let mut arena = Arena::with_capacity(8);
1344    for e in &exprs {
1345        let field = e
1346            .to_field_amortized(&input_schema, Context::Default, &mut arena)
1347            .unwrap();
1348
1349        if !output_names.insert(field.name().clone()) {
1350            let msg = format!(
1351                "the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\
1352                    It's possible that multiple expressions are returning the same default column name. \
1353                    If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
1354                    duplicate column names.",
1355                field.name()
1356            );
1357            polars_bail!(ComputeError: msg)
1358        }
1359        output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);
1360        arena.clear();
1361    }
1362
1363    let eirs = to_expr_irs(exprs, expr_arena, &input_schema)?;
1364    Ok((eirs, Arc::new(output_schema)))
1365}
1366
1367fn resolve_group_by(
1368    input: Node,
1369    keys: Vec<Expr>,
1370    aggs: Vec<Expr>,
1371    _options: &GroupbyOptions,
1372    lp_arena: &Arena<IR>,
1373    expr_arena: &mut Arena<AExpr>,
1374    opt_flags: &mut OptFlags,
1375) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {
1376    let input_schema = lp_arena.get(input).schema(lp_arena);
1377    let input_schema = input_schema.as_ref();
1378    let mut keys = rewrite_projections(keys, input_schema, &[], opt_flags)?;
1379
1380    // Initialize schema from keys
1381    let mut output_schema = expressions_to_schema(&keys, input_schema, Context::Default)?;
1382
1383    #[allow(unused_mut)]
1384    let mut pop_keys = false;
1385    // Add dynamic groupby index column(s)
1386    // Also add index columns to keys for expression expansion.
1387    #[cfg(feature = "dynamic_group_by")]
1388    {
1389        if let Some(options) = _options.rolling.as_ref() {
1390            let name = options.index_column.clone();
1391            let dtype = input_schema.try_get(name.as_str())?;
1392            keys.push(col(name.clone()));
1393            pop_keys = true;
1394            output_schema.with_column(name.clone(), dtype.clone());
1395        } else if let Some(options) = _options.dynamic.as_ref() {
1396            let name = options.index_column.clone();
1397            keys.push(col(name.clone()));
1398            pop_keys = true;
1399            let dtype = input_schema.try_get(name.as_str())?;
1400            if options.include_boundaries {
1401                output_schema.with_column("_lower_boundary".into(), dtype.clone());
1402                output_schema.with_column("_upper_boundary".into(), dtype.clone());
1403            }
1404            output_schema.with_column(name.clone(), dtype.clone());
1405        }
1406    }
1407    let keys_index_len = output_schema.len();
1408
1409    let aggs = rewrite_projections(aggs, input_schema, &keys, opt_flags)?;
1410    if pop_keys {
1411        let _ = keys.pop();
1412    }
1413
1414    // Add aggregation column(s)
1415    let aggs_schema = expressions_to_schema(&aggs, input_schema, Context::Aggregation)?;
1416    output_schema.merge(aggs_schema);
1417
1418    // Make sure aggregation columns do not contain keys or index columns
1419    if output_schema.len() < (keys_index_len + aggs.len()) {
1420        let mut names = PlHashSet::with_capacity(output_schema.len());
1421        for expr in aggs.iter().chain(keys.iter()) {
1422            let name = expr_output_name(expr)?;
1423            polars_ensure!(names.insert(name.clone()), duplicate = name)
1424        }
1425    }
1426    let keys = to_expr_irs(keys, expr_arena, input_schema)?;
1427    let aggs = to_expr_irs(aggs, expr_arena, input_schema)?;
1428    validate_expressions(&keys, expr_arena, input_schema, "group by")?;
1429    validate_expressions(&aggs, expr_arena, input_schema, "group by")?;
1430
1431    Ok((keys, aggs, Arc::new(output_schema)))
1432}
1433fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>
1434where
1435    F: Fn(&DataType) -> bool,
1436    E: Fn(&PlSmallStr) -> Expr,
1437{
1438    schema
1439        .iter()
1440        .map(|(name, dt)| {
1441            if condition(dt) {
1442                expr(name)
1443            } else {
1444                lit(NULL).cast(dt.clone()).alias(name.clone())
1445            }
1446        })
1447        .collect()
1448}
1449
1450pub(crate) fn maybe_init_projection_excluding_hive(
1451    reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
1452    hive_parts: Option<&SchemaRef>,
1453) -> Option<Arc<[PlSmallStr]>> {
1454    // Update `with_columns` with a projection so that hive columns aren't loaded from the
1455    // file
1456    let hive_schema = hive_parts?;
1457
1458    match &reader_schema {
1459        Either::Left(reader_schema) => hive_schema
1460            .iter_names()
1461            .any(|x| reader_schema.contains(x))
1462            .then(|| {
1463                reader_schema
1464                    .iter_names_cloned()
1465                    .filter(|x| !hive_schema.contains(x))
1466                    .collect::<Arc<[_]>>()
1467            }),
1468        Either::Right(reader_schema) => hive_schema
1469            .iter_names()
1470            .any(|x| reader_schema.contains(x))
1471            .then(|| {
1472                reader_schema
1473                    .iter_names_cloned()
1474                    .filter(|x| !hive_schema.contains(x))
1475                    .collect::<Arc<[_]>>()
1476            }),
1477    }
1478}