1use arrow::datatypes::ArrowSchemaRef;
2use either::Either;
3use expr_expansion::rewrite_projections;
4use hive::hive_partitions_from_paths;
5use polars_core::chunked_array::cast::CastOptions;
6use polars_core::config::verbose;
7use polars_utils::plpath::PlPath;
8use polars_utils::unique_id::UniqueId;
9
10use super::convert_utils::SplitPredicates;
11use super::stack_opt::ConversionOptimizer;
12use super::*;
13
14mod concat;
15mod datatype_fn_to_ir;
16mod expr_expansion;
17mod expr_to_ir;
18mod functions;
19mod join;
20mod scans;
21mod utils;
22pub use expr_expansion::{expand_expression, is_regex_projection, prepare_projection};
23pub use expr_to_ir::{ExprToIRContext, to_expr_ir};
24use expr_to_ir::{to_expr_ir_materialized_lit, to_expr_irs};
25use utils::DslConversionContext;
26
27macro_rules! failed_here {
28 ($($t:tt)*) => {
29 format!("'{}'", stringify!($($t)*)).into()
30 }
31}
32pub(super) use failed_here;
33
34pub fn to_alp(
35 lp: DslPlan,
36 expr_arena: &mut Arena<AExpr>,
37 lp_arena: &mut Arena<IR>,
38 opt_flags: &mut OptFlags,
40) -> PolarsResult<Node> {
41 let conversion_optimizer = ConversionOptimizer::new(
42 opt_flags.contains(OptFlags::SIMPLIFY_EXPR),
43 opt_flags.contains(OptFlags::TYPE_COERCION),
44 opt_flags.contains(OptFlags::TYPE_CHECK),
45 );
46
47 let mut ctxt = DslConversionContext {
48 expr_arena,
49 lp_arena,
50 conversion_optimizer,
51 opt_flags,
52 nodes_scratch: &mut unitvec![],
53 cache_file_info: Default::default(),
54 pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),
55 verbose: verbose(),
56 seen_caches: Default::default(),
57 };
58
59 match to_alp_impl(lp, &mut ctxt) {
60 Ok(out) => Ok(out),
61 Err(err) => {
62 if opt_flags.contains(OptFlags::EAGER) {
63 return Err(err.remove_context());
66 };
67
68 let Some(ir_until_then) = lp_arena.last_node() else {
69 return Err(err);
70 };
71
72 let node_name = if let PolarsError::Context { msg, .. } = &err {
73 msg
74 } else {
75 "THIS_NODE"
76 };
77 let plan = IRPlan::new(
78 ir_until_then,
79 std::mem::take(lp_arena),
80 std::mem::take(expr_arena),
81 );
82 let location = format!("{}", plan.display());
83 Err(err.wrap_msg(|msg| {
84 format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")
85 }))
86 },
87 }
88}
89
90fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {
91 let lp_node = ctxt.lp_arena.add(lp);
92 ctxt.conversion_optimizer
93 .optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node, false)
94 .map_err(|e| e.context(format!("'{name}' failed").into()))?;
95
96 Ok(lp_node)
97}
98
99#[recursive]
103pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
104 let owned = Arc::unwrap_or_clone;
105
106 let v = match lp {
107 DslPlan::Scan {
108 sources,
109 unified_scan_args,
110 scan_type,
111 cached_ir,
112 } => scans::dsl_to_ir(sources, unified_scan_args, scan_type, cached_ir, ctxt)?,
113 #[cfg(feature = "python")]
114 DslPlan::PythonScan { options } => {
115 use crate::dsl::python_dsl::PythonOptionsDsl;
116
117 let schema = options.get_schema()?;
118
119 let PythonOptionsDsl {
120 scan_fn,
121 schema_fn: _,
122 python_source,
123 validate_schema,
124 is_pure,
125 } = options;
126
127 IR::PythonScan {
128 options: PythonOptions {
129 scan_fn,
130 schema,
131 python_source,
132 validate_schema,
133 output_schema: Default::default(),
134 with_columns: Default::default(),
135 n_rows: Default::default(),
136 predicate: Default::default(),
137 is_pure,
138 },
139 }
140 },
141 DslPlan::Union { inputs, args } => {
142 let mut inputs = inputs
143 .into_iter()
144 .map(|lp| to_alp_impl(lp, ctxt))
145 .collect::<PolarsResult<Vec<_>>>()
146 .map_err(|e| e.context(failed_here!(vertical concat)))?;
147
148 if args.diagonal {
149 inputs = concat::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;
150 }
151
152 if args.to_supertypes {
153 concat::convert_st_union(
154 &mut inputs,
155 ctxt.lp_arena,
156 ctxt.expr_arena,
157 ctxt.opt_flags,
158 )
159 .map_err(|e| e.context(failed_here!(vertical concat)))?;
160 }
161
162 let first = *inputs.first().ok_or_else(
163 || polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),
164 )?;
165 let schema = ctxt.lp_arena.get(first).schema(ctxt.lp_arena);
166 for n in &inputs[1..] {
167 let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);
168 schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation: "'union'/'concat' inputs should all have the same schema,\
170 got\n{:?} and \n{:?}", schema, schema_i)
171 )?;
172 }
173
174 let options = args.into();
175 IR::Union { inputs, options }
176 },
177 DslPlan::HConcat { inputs, options } => {
178 let inputs = inputs
179 .into_iter()
180 .map(|lp| to_alp_impl(lp, ctxt))
181 .collect::<PolarsResult<Vec<_>>>()
182 .map_err(|e| e.context(failed_here!(horizontal concat)))?;
183
184 let schema = concat::h_concat_schema(&inputs, ctxt.lp_arena)?;
185
186 IR::HConcat {
187 inputs,
188 schema,
189 options,
190 }
191 },
192 DslPlan::Filter { input, predicate } => {
193 let mut input =
194 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;
195 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
196
197 let mut out = Vec::with_capacity(1);
198 expr_expansion::expand_expression(
199 &predicate,
200 &PlHashSet::default(),
201 input_schema.as_ref().as_ref(),
202 &mut out,
203 ctxt.opt_flags,
204 )?;
205
206 let predicate = match out.len() {
207 1 => {
208 out.pop().unwrap()
210 },
211 0 => {
212 let msg = "The predicate expanded to zero expressions. \
213 This may for example be caused by a regex not matching column names or \
214 a column dtype match not hitting any dtypes in the DataFrame";
215 polars_bail!(ComputeError: msg);
216 },
217 _ => {
218 let mut expanded = String::new();
219 for e in out.iter().take(5) {
220 expanded.push_str(&format!("\t{e:?},\n"))
221 }
222 expanded.pop();
224 if out.len() > 5 {
225 expanded.push_str("\t...\n")
226 }
227
228 let msg = if cfg!(feature = "python") {
229 format!(
230 "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
231 This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."
232 )
233 } else {
234 format!(
235 "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
236 This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."
237 )
238 };
239 polars_bail!(ComputeError: msg)
240 },
241 };
242 let predicate_ae = to_expr_ir(
243 predicate,
244 &mut ExprToIRContext::new_with_opt_eager(
245 ctxt.expr_arena,
246 &input_schema,
247 ctxt.opt_flags,
248 ),
249 )?;
250
251 if ctxt.opt_flags.predicate_pushdown() {
252 ctxt.nodes_scratch.clear();
253
254 if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(
255 predicate_ae.node(),
256 ctxt.expr_arena,
257 Some(ctxt.nodes_scratch),
258 ctxt.pushdown_maintain_errors,
259 ) {
260 let mut update_input = |predicate: Node| -> PolarsResult<()> {
261 let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
262 ctxt.conversion_optimizer
263 .push_scratch(predicate.node(), ctxt.expr_arena);
264 let lp = IR::Filter { input, predicate };
265 input = run_conversion(lp, ctxt, "filter")?;
266
267 Ok(())
268 };
269
270 for predicate in pushable {
273 update_input(predicate)?;
274 }
275
276 if let Some(node) = fallible {
277 update_input(node)?;
278 }
279
280 return Ok(input);
281 };
282 };
283
284 ctxt.conversion_optimizer
285 .push_scratch(predicate_ae.node(), ctxt.expr_arena);
286 let lp = IR::Filter {
287 input,
288 predicate: predicate_ae,
289 };
290 return run_conversion(lp, ctxt, "filter");
291 },
292 DslPlan::Slice { input, offset, len } => {
293 let input =
294 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;
295 IR::Slice { input, offset, len }
296 },
297 DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
298 df,
299 schema,
300 output_schema: None,
301 },
302 DslPlan::Select {
303 expr,
304 input,
305 options,
306 } => {
307 let input =
308 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
309 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
310 let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)
311 .map_err(|e| e.context(failed_here!(select)))?;
312
313 if exprs.is_empty() {
314 ctxt.lp_arena.replace(input, utils::empty_df());
315 return Ok(input);
316 }
317
318 let eirs = to_expr_irs(
319 exprs,
320 &mut ExprToIRContext::new_with_opt_eager(
321 ctxt.expr_arena,
322 &input_schema,
323 ctxt.opt_flags,
324 ),
325 )?;
326 ctxt.conversion_optimizer
327 .fill_scratch(&eirs, ctxt.expr_arena);
328
329 let schema = Arc::new(schema);
330 let lp = IR::Select {
331 expr: eirs,
332 input,
333 schema,
334 options,
335 };
336
337 return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));
338 },
339 DslPlan::Sort {
340 input,
341 by_column,
342 slice,
343 mut sort_options,
344 } => {
345 let input =
346 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
347 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
348
349 let n_by_exprs = if by_column.len() == 1 {
351 match &by_column[0] {
352 Expr::Selector(s) => s.into_columns(&input_schema, &Default::default())?.len(),
353 _ => 1,
354 }
355 } else {
356 by_column.len()
357 };
358 let n_desc = sort_options.descending.len();
359 polars_ensure!(
360 n_desc == n_by_exprs || n_desc == 1,
361 ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()
362 );
363 let n_nulls_last = sort_options.nulls_last.len();
364 polars_ensure!(
365 n_nulls_last == n_by_exprs || n_nulls_last == 1,
366 ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()
367 );
368
369 let mut expanded_cols = Vec::new();
370 let mut nulls_last = Vec::new();
371 let mut descending = Vec::new();
372
373 for (c, (&n, &d)) in by_column.into_iter().zip(
377 sort_options
378 .nulls_last
379 .iter()
380 .cycle()
381 .zip(sort_options.descending.iter().cycle()),
382 ) {
383 let exprs = utils::expand_expressions(
384 input,
385 vec![c],
386 ctxt.lp_arena,
387 ctxt.expr_arena,
388 ctxt.opt_flags,
389 )
390 .map_err(|e| e.context(failed_here!(sort)))?;
391
392 nulls_last.extend(std::iter::repeat_n(n, exprs.len()));
393 descending.extend(std::iter::repeat_n(d, exprs.len()));
394 expanded_cols.extend(exprs);
395 }
396 sort_options.nulls_last = nulls_last;
397 sort_options.descending = descending;
398
399 ctxt.conversion_optimizer
400 .fill_scratch(&expanded_cols, ctxt.expr_arena);
401 let mut by_column = expanded_cols;
402
403 if by_column.len() > 1 {
405 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
406
407 let mut null_columns = vec![];
408
409 for (i, c) in by_column.iter().enumerate() {
410 if let DataType::Null = c.dtype(&input_schema, ctxt.expr_arena)? {
411 null_columns.push(i);
412 }
413 }
414 if null_columns.len() == by_column.len() {
416 by_column.truncate(1);
417 sort_options.nulls_last.truncate(1);
418 sort_options.descending.truncate(1);
419 }
420 else if !null_columns.is_empty() {
422 for i in null_columns.into_iter().rev() {
423 by_column.remove(i);
424 sort_options.nulls_last.remove(i);
425 sort_options.descending.remove(i);
426 }
427 }
428 }
429 if by_column.is_empty() {
430 return Ok(input);
431 };
432
433 let lp = IR::Sort {
434 input,
435 by_column,
436 slice,
437 sort_options,
438 };
439
440 return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));
441 },
442 DslPlan::Cache { input, id } => {
443 let input = match ctxt.seen_caches.get(&id) {
444 Some(input) => *input,
445 None => {
446 let input = to_alp_impl(owned(input), ctxt)
447 .map_err(|e| e.context(failed_here!(cache)))?;
448 let seen_before = ctxt.seen_caches.insert(id, input);
449 assert!(
450 seen_before.is_none(),
451 "Cache could not have been created in the mean time. That would make the DAG cyclic."
452 );
453 input
454 },
455 };
456
457 IR::Cache { input, id }
458 },
459 DslPlan::GroupBy {
460 input,
461 keys,
462 aggs,
463 apply,
464 maintain_order,
465 options,
466 } => {
467 let input =
468 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;
469
470 if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {
472 ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)
473 }
474
475 let (keys, aggs, schema) = resolve_group_by(
476 input,
477 keys,
478 aggs,
479 &options,
480 ctxt.lp_arena,
481 ctxt.expr_arena,
482 ctxt.opt_flags,
483 )
484 .map_err(|e| e.context(failed_here!(group_by)))?;
485
486 let (apply, schema) = if let Some((apply, schema)) = apply {
487 (Some(apply), schema)
488 } else {
489 (None, schema)
490 };
491
492 ctxt.conversion_optimizer
493 .fill_scratch(&keys, ctxt.expr_arena);
494 ctxt.conversion_optimizer
495 .fill_scratch(&aggs, ctxt.expr_arena);
496
497 let lp = IR::GroupBy {
498 input,
499 keys,
500 aggs,
501 schema,
502 apply,
503 maintain_order,
504 options,
505 };
506
507 return run_conversion(lp, ctxt, "group_by")
508 .map_err(|e| e.context(failed_here!(group_by)));
509 },
510 DslPlan::Join {
511 input_left,
512 input_right,
513 left_on,
514 right_on,
515 predicates,
516 options,
517 } => {
518 return join::resolve_join(
519 Either::Left(input_left),
520 Either::Left(input_right),
521 left_on,
522 right_on,
523 predicates,
524 JoinOptionsIR::from(Arc::unwrap_or_clone(options)),
525 ctxt,
526 )
527 .map_err(|e| e.context(failed_here!(join)))
528 .map(|t| t.0);
529 },
530 DslPlan::HStack {
531 input,
532 exprs,
533 options,
534 } => {
535 let input = to_alp_impl(owned(input), ctxt)
536 .map_err(|e| e.context(failed_here!(with_columns)))?;
537 let (exprs, schema) =
538 resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)
539 .map_err(|e| e.context(failed_here!(with_columns)))?;
540
541 ctxt.conversion_optimizer
542 .fill_scratch(&exprs, ctxt.expr_arena);
543 let lp = IR::HStack {
544 input,
545 exprs,
546 schema,
547 options,
548 };
549 return run_conversion(lp, ctxt, "with_columns");
550 },
551 DslPlan::MatchToSchema {
552 input,
553 match_schema,
554 per_column,
555 extra_columns,
556 } => {
557 let input =
558 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
559 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
560
561 assert_eq!(per_column.len(), match_schema.len());
562
563 if input_schema.as_ref() == &match_schema {
564 return Ok(input);
565 }
566
567 let mut exprs = Vec::with_capacity(match_schema.len());
568 let mut found_missing_columns = Vec::new();
569 let mut used_input_columns = 0;
570
571 for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {
572 match input_schema.get(column) {
573 None => match &per_column.missing_columns {
574 MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),
575 MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(
576 Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(
577 dtype.clone(),
578 )))),
579 column.clone(),
580 )),
581 MissingColumnsPolicyOrExpr::InsertWith(expr) => {
582 exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))
583 },
584 },
585 Some(input_dtype) if dtype == input_dtype => {
586 used_input_columns += 1;
587 exprs.push(Expr::Column(column.clone()))
588 },
589 Some(input_dtype) => {
590 let from_dtype = input_dtype;
591 let to_dtype = dtype;
592
593 let policy = CastColumnsPolicy {
594 integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,
595 float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,
596 missing_struct_fields: per_column.missing_struct_fields,
597 extra_struct_fields: per_column.extra_struct_fields,
598
599 ..Default::default()
600 };
601
602 let should_cast =
603 policy.should_cast_column(column, to_dtype, from_dtype)?;
604
605 let mut expr = Expr::Column(PlSmallStr::from_str(column));
606 if should_cast {
607 expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);
608 }
609
610 used_input_columns += 1;
611 exprs.push(expr);
612 },
613 }
614 }
615
616 if let Some(lst) = found_missing_columns.first() {
618 use std::fmt::Write;
619 let mut formatted = String::new();
620 write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();
621 for c in &found_missing_columns[1..] {
622 write!(&mut formatted, ", \"{c}\"").unwrap();
623 }
624
625 write!(&mut formatted, "\"{lst}\"").unwrap();
626 polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");
627 }
628
629 if used_input_columns != input_schema.len()
631 && extra_columns == ExtraColumnsPolicy::Raise
632 {
633 let found_extra_columns = input_schema
634 .iter_names()
635 .filter(|n| !match_schema.contains(n))
636 .collect::<Vec<_>>();
637
638 use std::fmt::Write;
639 let mut formatted = String::new();
640 write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();
641 for c in &found_extra_columns[1..] {
642 write!(&mut formatted, ", \"{c}\"").unwrap();
643 }
644
645 polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");
646 }
647
648 let exprs = to_expr_irs(
649 exprs,
650 &mut ExprToIRContext::new_with_opt_eager(
651 ctxt.expr_arena,
652 &input_schema,
653 ctxt.opt_flags,
654 ),
655 )?;
656
657 ctxt.conversion_optimizer
658 .fill_scratch(&exprs, ctxt.expr_arena);
659 let lp = IR::Select {
660 input,
661 expr: exprs,
662 schema: match_schema.clone(),
663 options: ProjectionOptions {
664 run_parallel: true,
665 duplicate_check: false,
666 should_broadcast: true,
667 },
668 };
669 return run_conversion(lp, ctxt, "match_to_schema");
670 },
671 DslPlan::PipeWithSchema { input, callback } => {
672 let input_owned = owned(input);
673
674 let input = to_alp_impl(input_owned.clone(), ctxt)
676 .map_err(|e| e.context(failed_here!(pipe_with_schema)))?;
677 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
678
679 let input_owned = DslPlan::IR {
680 dsl: Arc::new(input_owned),
681 version: ctxt.lp_arena.version(),
682 node: Some(input),
683 };
684
685 let input_adjusted =
687 callback.call((input_owned, Arc::unwrap_or_clone(input_schema.into_owned())))?;
688 return to_alp_impl(input_adjusted, ctxt);
689 },
690 DslPlan::Distinct { input, options } => {
691 let input =
692 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
693 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
694
695 let subset = options
696 .subset
697 .map(|s| {
698 PolarsResult::Ok(
699 s.into_columns(input_schema.as_ref(), &Default::default())?
700 .into_iter()
701 .collect(),
702 )
703 })
704 .transpose()?;
705
706 let options = DistinctOptionsIR {
707 subset,
708 maintain_order: options.maintain_order,
709 keep_strategy: options.keep_strategy,
710 slice: None,
711 };
712
713 IR::Distinct { input, options }
714 },
715 DslPlan::MapFunction { input, function } => {
716 let input = to_alp_impl(owned(input), ctxt)
717 .map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;
718 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
719
720 match function {
721 DslFunction::Explode {
722 columns,
723 allow_empty,
724 } => {
725 let columns = columns.into_columns(&input_schema, &Default::default())?;
726 polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");
727 if columns.is_empty() {
728 return Ok(input);
729 }
730 let function = FunctionIR::Explode {
731 columns: columns.into_iter().collect(),
732 schema: Default::default(),
733 };
734 let ir = IR::MapFunction { input, function };
735 return Ok(ctxt.lp_arena.add(ir));
736 },
737 DslFunction::FillNan(fill_value) => {
738 let exprs = input_schema
739 .iter()
740 .filter_map(|(name, dtype)| match dtype {
741 DataType::Float32 | DataType::Float64 => Some(
742 col(name.clone())
743 .fill_nan(fill_value.clone())
744 .alias(name.clone()),
745 ),
746 _ => None,
747 })
748 .collect::<Vec<_>>();
749
750 let (exprs, schema) = resolve_with_columns(
751 exprs,
752 input,
753 ctxt.lp_arena,
754 ctxt.expr_arena,
755 ctxt.opt_flags,
756 )
757 .map_err(|e| e.context(failed_here!(fill_nan)))?;
758
759 ctxt.conversion_optimizer
760 .fill_scratch(&exprs, ctxt.expr_arena);
761
762 let lp = IR::HStack {
763 input,
764 exprs,
765 schema,
766 options: ProjectionOptions {
767 duplicate_check: false,
768 ..Default::default()
769 },
770 };
771 return run_conversion(lp, ctxt, "fill_nan");
772 },
773 DslFunction::Stats(sf) => {
774 let exprs = match sf {
775 StatsFunction::Var { ddof } => stats_helper(
776 |dt| dt.is_primitive_numeric() || dt.is_bool(),
777 |name| col(name.clone()).var(ddof),
778 &input_schema,
779 ),
780 StatsFunction::Std { ddof } => stats_helper(
781 |dt| dt.is_primitive_numeric() || dt.is_bool(),
782 |name| col(name.clone()).std(ddof),
783 &input_schema,
784 ),
785 StatsFunction::Quantile { quantile, method } => stats_helper(
786 |dt| dt.is_primitive_numeric(),
787 |name| col(name.clone()).quantile(quantile.clone(), method),
788 &input_schema,
789 ),
790 StatsFunction::Mean => stats_helper(
791 |dt| {
792 dt.is_primitive_numeric()
793 || dt.is_temporal()
794 || dt == &DataType::Boolean
795 },
796 |name| col(name.clone()).mean(),
797 &input_schema,
798 ),
799 StatsFunction::Sum => stats_helper(
800 |dt| {
801 dt.is_primitive_numeric()
802 || dt.is_decimal()
803 || matches!(dt, DataType::Boolean | DataType::Duration(_))
804 },
805 |name| col(name.clone()).sum(),
806 &input_schema,
807 ),
808 StatsFunction::Min => stats_helper(
809 |dt| dt.is_ord(),
810 |name| col(name.clone()).min(),
811 &input_schema,
812 ),
813 StatsFunction::Max => stats_helper(
814 |dt| dt.is_ord(),
815 |name| col(name.clone()).max(),
816 &input_schema,
817 ),
818 StatsFunction::Median => stats_helper(
819 |dt| {
820 dt.is_primitive_numeric()
821 || dt.is_temporal()
822 || dt == &DataType::Boolean
823 },
824 |name| col(name.clone()).median(),
825 &input_schema,
826 ),
827 };
828 let schema = Arc::new(expressions_to_schema(&exprs, &input_schema)?);
829 let eirs = to_expr_irs(
830 exprs,
831 &mut ExprToIRContext::new_with_opt_eager(
832 ctxt.expr_arena,
833 &input_schema,
834 ctxt.opt_flags,
835 ),
836 )?;
837
838 ctxt.conversion_optimizer
839 .fill_scratch(&eirs, ctxt.expr_arena);
840
841 let lp = IR::Select {
842 input,
843 expr: eirs,
844 schema,
845 options: ProjectionOptions {
846 duplicate_check: false,
847 ..Default::default()
848 },
849 };
850 return run_conversion(lp, ctxt, "stats");
851 },
852 DslFunction::Rename {
853 existing,
854 new,
855 strict,
856 } => {
857 assert_eq!(existing.len(), new.len());
858 if existing.is_empty() {
859 return Ok(input);
860 }
861
862 let existing_lut =
863 PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));
864
865 let mut schema = Schema::with_capacity(input_schema.len());
866 let mut num_replaced = 0;
867
868 let expr = input_schema
870 .iter()
871 .map(|(n, dtype)| {
872 Ok(match existing_lut.get_index_of(n.as_str()) {
873 None => {
874 schema.try_insert(n.clone(), dtype.clone())?;
875 Expr::Column(n.clone())
876 },
877 Some(i) => {
878 num_replaced += 1;
879 schema.try_insert(new[i].clone(), dtype.clone())?;
880 Expr::Column(n.clone()).alias(new[i].clone())
881 },
882 })
883 })
884 .collect::<PolarsResult<Vec<_>>>()?;
885
886 if strict && num_replaced != existing.len() {
887 let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();
888 polars_bail!(col_not_found = col);
889 }
890
891 if num_replaced == 0 {
893 return Ok(input);
894 }
895
896 let expr = to_expr_irs(
897 expr,
898 &mut ExprToIRContext::new_with_opt_eager(
899 ctxt.expr_arena,
900 &input_schema,
901 ctxt.opt_flags,
902 ),
903 )?;
904 ctxt.conversion_optimizer
905 .fill_scratch(&expr, ctxt.expr_arena);
906
907 IR::Select {
908 input,
909 expr,
910 schema: Arc::new(schema),
911 options: ProjectionOptions {
912 run_parallel: false,
913 duplicate_check: false,
914 should_broadcast: false,
915 },
916 }
917 },
918 _ => {
919 let function = function.into_function_ir(&input_schema)?;
920 IR::MapFunction { input, function }
921 },
922 }
923 },
924 DslPlan::ExtContext { input, contexts } => {
925 let input = to_alp_impl(owned(input), ctxt)
926 .map_err(|e| e.context(failed_here!(with_context)))?;
927 let contexts = contexts
928 .into_iter()
929 .map(|lp| to_alp_impl(lp, ctxt))
930 .collect::<PolarsResult<Vec<_>>>()
931 .map_err(|e| e.context(failed_here!(with_context)))?;
932
933 let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();
934 for input in &contexts {
935 let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);
936 for fld in other_schema.iter_fields() {
937 if schema.get(fld.name()).is_none() {
938 schema.with_column(fld.name, fld.dtype);
939 }
940 }
941 }
942
943 IR::ExtContext {
944 input,
945 contexts,
946 schema: Arc::new(schema),
947 }
948 },
949 DslPlan::Sink { input, payload } => {
950 let input =
951 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
952 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
953 let payload = match payload {
954 SinkType::Memory => SinkTypeIR::Memory,
955 SinkType::File(f) => SinkTypeIR::File(f),
956 SinkType::Partition(f) => SinkTypeIR::Partition(PartitionSinkTypeIR {
957 base_path: f.base_path,
958 file_path_cb: f.file_path_cb,
959 file_type: f.file_type,
960 sink_options: f.sink_options,
961 variant: match f.variant {
962 PartitionVariant::MaxSize(max_size) => {
963 PartitionVariantIR::MaxSize(max_size)
964 },
965 PartitionVariant::Parted {
966 key_exprs,
967 include_key,
968 } => {
969 let eirs = to_expr_irs(
970 key_exprs,
971 &mut ExprToIRContext::new_with_opt_eager(
972 ctxt.expr_arena,
973 &input_schema,
974 ctxt.opt_flags,
975 ),
976 )?;
977 ctxt.conversion_optimizer
978 .fill_scratch(&eirs, ctxt.expr_arena);
979
980 PartitionVariantIR::Parted {
981 key_exprs: eirs,
982 include_key,
983 }
984 },
985 PartitionVariant::ByKey {
986 key_exprs,
987 include_key,
988 } => {
989 let eirs = to_expr_irs(
990 key_exprs,
991 &mut ExprToIRContext::new_with_opt_eager(
992 ctxt.expr_arena,
993 &input_schema,
994 ctxt.opt_flags,
995 ),
996 )?;
997 ctxt.conversion_optimizer
998 .fill_scratch(&eirs, ctxt.expr_arena);
999
1000 PartitionVariantIR::ByKey {
1001 key_exprs: eirs,
1002 include_key,
1003 }
1004 },
1005 },
1006 cloud_options: f.cloud_options,
1007 per_partition_sort_by: match f.per_partition_sort_by {
1008 None => None,
1009 Some(sort_by) => Some(
1010 sort_by
1011 .into_iter()
1012 .map(|s| {
1013 let expr = to_expr_ir(
1014 s.expr,
1015 &mut ExprToIRContext::new_with_opt_eager(
1016 ctxt.expr_arena,
1017 &input_schema,
1018 ctxt.opt_flags,
1019 ),
1020 )?;
1021 ctxt.conversion_optimizer
1022 .push_scratch(expr.node(), ctxt.expr_arena);
1023 Ok(SortColumnIR {
1024 expr,
1025 descending: s.descending,
1026 nulls_last: s.nulls_last,
1027 })
1028 })
1029 .collect::<PolarsResult<Vec<_>>>()?,
1030 ),
1031 },
1032 finish_callback: f.finish_callback,
1033 }),
1034 };
1035
1036 let lp = IR::Sink { input, payload };
1037 return run_conversion(lp, ctxt, "sink");
1038 },
1039 DslPlan::SinkMultiple { inputs } => {
1040 let inputs = inputs
1041 .into_iter()
1042 .map(|lp| to_alp_impl(lp, ctxt))
1043 .collect::<PolarsResult<Vec<_>>>()
1044 .map_err(|e| e.context(failed_here!(vertical concat)))?;
1045 IR::SinkMultiple { inputs }
1046 },
1047 #[cfg(feature = "merge_sorted")]
1048 DslPlan::MergeSorted {
1049 input_left,
1050 input_right,
1051 key,
1052 } => {
1053 let input_left = to_alp_impl(owned(input_left), ctxt)
1054 .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1055 let input_right = to_alp_impl(owned(input_right), ctxt)
1056 .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1057
1058 let left_schema = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);
1059 let right_schema = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);
1060
1061 left_schema
1062 .ensure_is_exact_match(&right_schema)
1063 .map_err(|err| err.context("merge_sorted".into()))?;
1064
1065 left_schema
1066 .try_get(key.as_str())
1067 .map_err(|err| err.context("merge_sorted".into()))?;
1068
1069 IR::MergeSorted {
1070 input_left,
1071 input_right,
1072 key,
1073 }
1074 },
1075 DslPlan::IR { node, dsl, version } => {
1076 return match node {
1077 Some(node)
1078 if version == ctxt.lp_arena.version()
1079 && ctxt.conversion_optimizer.used_arenas.insert(version) =>
1080 {
1081 Ok(node)
1082 },
1083 _ => to_alp_impl(owned(dsl), ctxt),
1084 };
1085 },
1086 };
1087 Ok(ctxt.lp_arena.add(v))
1088}
1089
1090fn resolve_with_columns(
1091 exprs: Vec<Expr>,
1092 input: Node,
1093 lp_arena: &Arena<IR>,
1094 expr_arena: &mut Arena<AExpr>,
1095 opt_flags: &mut OptFlags,
1096) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {
1097 let input_schema = lp_arena.get(input).schema(lp_arena);
1098 let mut output_schema = (**input_schema).clone();
1099 let exprs = rewrite_projections(exprs, &PlHashSet::new(), &input_schema, opt_flags)?;
1100 let mut output_names = PlHashSet::with_capacity(exprs.len());
1101
1102 let eirs = to_expr_irs(
1103 exprs,
1104 &mut ExprToIRContext::new_with_opt_eager(expr_arena, &input_schema, opt_flags),
1105 )?;
1106 for eir in eirs.iter() {
1107 let field = eir.field(&input_schema, expr_arena)?;
1108
1109 if !output_names.insert(field.name().clone()) {
1110 let msg = format!(
1111 "the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\
1112 It's possible that multiple expressions are returning the same default column name. \
1113 If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
1114 duplicate column names.",
1115 field.name()
1116 );
1117 polars_bail!(ComputeError: msg)
1118 }
1119 output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);
1120 }
1121
1122 Ok((eirs, Arc::new(output_schema)))
1123}
1124
1125fn resolve_group_by(
1126 input: Node,
1127 keys: Vec<Expr>,
1128 aggs: Vec<Expr>,
1129 _options: &GroupbyOptions,
1130 lp_arena: &Arena<IR>,
1131 expr_arena: &mut Arena<AExpr>,
1132 opt_flags: &mut OptFlags,
1133) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {
1134 let input_schema = lp_arena.get(input).schema(lp_arena);
1135 let input_schema = input_schema.as_ref();
1136 let mut keys = rewrite_projections(keys, &PlHashSet::default(), input_schema, opt_flags)?;
1137
1138 let mut output_schema = expressions_to_schema(&keys, input_schema)?;
1140 let mut key_names: PlHashSet<PlSmallStr> = output_schema.iter_names().cloned().collect();
1141
1142 #[allow(unused_mut)]
1143 let mut pop_keys = false;
1144 #[cfg(feature = "dynamic_group_by")]
1147 {
1148 if let Some(options) = _options.rolling.as_ref() {
1149 let name = options.index_column.clone();
1150 let dtype = input_schema.try_get(name.as_str())?;
1151 keys.push(col(name.clone()));
1152 key_names.insert(name.clone());
1153 pop_keys = true;
1154 output_schema.with_column(name.clone(), dtype.clone());
1155 } else if let Some(options) = _options.dynamic.as_ref() {
1156 let name = options.index_column.clone();
1157 keys.push(col(name.clone()));
1158 key_names.insert(name.clone());
1159 pop_keys = true;
1160 let dtype = input_schema.try_get(name.as_str())?;
1161 if options.include_boundaries {
1162 output_schema.with_column("_lower_boundary".into(), dtype.clone());
1163 output_schema.with_column("_upper_boundary".into(), dtype.clone());
1164 }
1165 output_schema.with_column(name.clone(), dtype.clone());
1166 }
1167 }
1168 let keys_index_len = output_schema.len();
1169 if pop_keys {
1170 let _ = keys.pop();
1171 }
1172 let keys = to_expr_irs(
1173 keys,
1174 &mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1175 )?;
1176
1177 let aggs = rewrite_projections(aggs, &key_names, input_schema, opt_flags)?;
1179 let aggs = to_expr_irs(
1180 aggs,
1181 &mut ExprToIRContext::new_with_opt_eager(expr_arena, input_schema, opt_flags),
1182 )?;
1183 utils::validate_expressions(&keys, expr_arena, input_schema, "group by")?;
1184 utils::validate_expressions(&aggs, expr_arena, input_schema, "group by")?;
1185
1186 let mut aggs_schema = expr_irs_to_schema(&aggs, input_schema, expr_arena);
1187
1188 if aggs_schema.len() < aggs.len() {
1190 let mut names = PlHashSet::with_capacity(aggs.len());
1191 for agg in aggs.iter() {
1192 let name = agg.output_name();
1193 polars_ensure!(names.insert(name.clone()), duplicate = name)
1194 }
1195 }
1196
1197 debug_assert!(aggs_schema.len() == aggs.len());
1199 for ((_name, dtype), expr) in aggs_schema.iter_mut().zip(&aggs) {
1200 if !expr.is_scalar(expr_arena) {
1201 *dtype = dtype.clone().implode();
1202 }
1203 }
1204
1205 output_schema.merge(aggs_schema);
1207
1208 if output_schema.len() < (keys_index_len + aggs.len()) {
1210 let mut names = PlHashSet::with_capacity(output_schema.len());
1211 for agg in aggs.iter().chain(keys.iter()) {
1212 let name = agg.output_name();
1213 polars_ensure!(names.insert(name.clone()), duplicate = name)
1214 }
1215 }
1216
1217 Ok((keys, aggs, Arc::new(output_schema)))
1218}
1219fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>
1220where
1221 F: Fn(&DataType) -> bool,
1222 E: Fn(&PlSmallStr) -> Expr,
1223{
1224 schema
1225 .iter()
1226 .map(|(name, dt)| {
1227 if condition(dt) {
1228 expr(name)
1229 } else {
1230 lit(NULL).cast(dt.clone()).alias(name.clone())
1231 }
1232 })
1233 .collect()
1234}
1235
1236pub(crate) fn maybe_init_projection_excluding_hive(
1237 reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
1238 hive_parts: Option<&SchemaRef>,
1239) -> Option<Arc<[PlSmallStr]>> {
1240 let hive_schema = hive_parts?;
1243
1244 match &reader_schema {
1245 Either::Left(reader_schema) => hive_schema
1246 .iter_names()
1247 .any(|x| reader_schema.contains(x))
1248 .then(|| {
1249 reader_schema
1250 .iter_names_cloned()
1251 .filter(|x| !hive_schema.contains(x))
1252 .collect::<Arc<[_]>>()
1253 }),
1254 Either::Right(reader_schema) => hive_schema
1255 .iter_names()
1256 .any(|x| reader_schema.contains(x))
1257 .then(|| {
1258 reader_schema
1259 .iter_names_cloned()
1260 .filter(|x| !hive_schema.contains(x))
1261 .collect::<Arc<[_]>>()
1262 }),
1263 }
1264}