1use arrow::datatypes::ArrowSchemaRef;
2use either::Either;
3use expr_expansion::{is_regex_projection, rewrite_projections};
4use hive::hive_partitions_from_paths;
5use polars_core::chunked_array::cast::CastOptions;
6use polars_utils::unique_id::UniqueId;
7
8use super::convert_utils::SplitPredicates;
9use super::stack_opt::ConversionOptimizer;
10use super::*;
11use crate::plans::conversion::expr_expansion::expand_selectors;
12
13fn expand_expressions(
14 input: Node,
15 exprs: Vec<Expr>,
16 lp_arena: &Arena<IR>,
17 expr_arena: &mut Arena<AExpr>,
18 opt_flags: &mut OptFlags,
19) -> PolarsResult<Vec<ExprIR>> {
20 let schema = lp_arena.get(input).schema(lp_arena);
21 let exprs = rewrite_projections(exprs, &schema, &[], opt_flags)?;
22 to_expr_irs(exprs, expr_arena, &schema)
23}
24
25fn empty_df() -> IR {
26 IR::DataFrameScan {
27 df: Arc::new(Default::default()),
28 schema: Arc::new(Default::default()),
29 output_schema: None,
30 }
31}
32
33fn validate_expression(
34 node: Node,
35 expr_arena: &Arena<AExpr>,
36 input_schema: &Schema,
37 operation_name: &str,
38) -> PolarsResult<()> {
39 let iter = aexpr_to_leaf_names_iter(node, expr_arena);
40 validate_columns_in_input(iter, input_schema, operation_name)
41}
42
43fn validate_expressions<N: Into<Node>, I: IntoIterator<Item = N>>(
44 nodes: I,
45 expr_arena: &Arena<AExpr>,
46 input_schema: &Schema,
47 operation_name: &str,
48) -> PolarsResult<()> {
49 let nodes = nodes.into_iter();
50
51 for node in nodes {
52 validate_expression(node.into(), expr_arena, input_schema, operation_name)?
53 }
54 Ok(())
55}
56
57macro_rules! failed_here {
58 ($($t:tt)*) => {
59 format!("'{}'", stringify!($($t)*)).into()
60 }
61}
62pub(super) use failed_here;
63
64pub fn to_alp(
65 lp: DslPlan,
66 expr_arena: &mut Arena<AExpr>,
67 lp_arena: &mut Arena<IR>,
68 opt_flags: &mut OptFlags,
70) -> PolarsResult<Node> {
71 let conversion_optimizer = ConversionOptimizer::new(
72 opt_flags.contains(OptFlags::SIMPLIFY_EXPR),
73 opt_flags.contains(OptFlags::TYPE_COERCION),
74 opt_flags.contains(OptFlags::TYPE_CHECK),
75 );
76
77 let mut ctxt = DslConversionContext {
78 expr_arena,
79 lp_arena,
80 conversion_optimizer,
81 opt_flags,
82 nodes_scratch: &mut unitvec![],
83 pushdown_maintain_errors: optimizer::pushdown_maintain_errors(),
84 };
85
86 match to_alp_impl(lp, &mut ctxt) {
87 Ok(out) => Ok(out),
88 Err(err) => {
89 if opt_flags.contains(OptFlags::EAGER) {
90 return Err(err.remove_context());
93 };
94
95 let Some(ir_until_then) = lp_arena.last_node() else {
96 return Err(err);
97 };
98
99 let node_name = if let PolarsError::Context { msg, .. } = &err {
100 msg
101 } else {
102 "THIS_NODE"
103 };
104 let plan = IRPlan::new(
105 ir_until_then,
106 std::mem::take(lp_arena),
107 std::mem::take(expr_arena),
108 );
109 let location = format!("{}", plan.display());
110 Err(err.wrap_msg(|msg| {
111 format!("{msg}\n\nResolved plan until failure:\n\n\t---> FAILED HERE RESOLVING {node_name} <---\n{location}")
112 }))
113 },
114 }
115}
116
117pub(super) struct DslConversionContext<'a> {
118 pub(super) expr_arena: &'a mut Arena<AExpr>,
119 pub(super) lp_arena: &'a mut Arena<IR>,
120 pub(super) conversion_optimizer: ConversionOptimizer,
121 pub(super) opt_flags: &'a mut OptFlags,
122 pub(super) nodes_scratch: &'a mut UnitVec<Node>,
123 pub(super) pushdown_maintain_errors: bool,
124}
125
126pub(super) fn run_conversion(
127 lp: IR,
128 ctxt: &mut DslConversionContext,
129 name: &str,
130) -> PolarsResult<Node> {
131 let lp_node = ctxt.lp_arena.add(lp);
132 ctxt.conversion_optimizer
133 .optimize_exprs(ctxt.expr_arena, ctxt.lp_arena, lp_node)
134 .map_err(|e| e.context(format!("'{name}' failed").into()))?;
135
136 Ok(lp_node)
137}
138
139#[recursive]
143pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
144 let owned = Arc::unwrap_or_clone;
145
146 let v = match lp {
147 DslPlan::Scan {
148 sources,
149 file_info,
150 unified_scan_args: mut unified_scan_args_box,
151 scan_type,
152 cached_ir,
153 } => {
154 let mut cached_ir = cached_ir.lock().unwrap();
157
158 if cached_ir.is_none() {
159 let cloud_options = unified_scan_args_box.cloud_options.clone();
160 let cloud_options = cloud_options.as_ref();
161
162 let unified_scan_args = unified_scan_args_box.as_mut();
163 let mut scan_type = scan_type.clone();
164
165 if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {
166 match unified_scan_args.hive_options.enabled {
167 None if !hive_schema.is_empty() => {
169 unified_scan_args.hive_options.enabled = Some(true)
170 },
171 Some(false) => polars_bail!(
173 ComputeError:
174 "a hive schema was given but hive_partitioning was disabled"
175 ),
176 Some(true) | None => {},
177 }
178 }
179
180 let sources =
181 match &*scan_type {
182 #[cfg(feature = "parquet")]
183 FileScan::Parquet { .. } => sources
184 .expand_paths_with_hive_update(unified_scan_args, cloud_options)?,
185 #[cfg(feature = "ipc")]
186 FileScan::Ipc { .. } => sources
187 .expand_paths_with_hive_update(unified_scan_args, cloud_options)?,
188 #[cfg(feature = "csv")]
189 FileScan::Csv { .. } => {
190 sources.expand_paths(unified_scan_args, cloud_options)?
191 },
192 #[cfg(feature = "json")]
193 FileScan::NDJson { .. } => {
194 sources.expand_paths(unified_scan_args, cloud_options)?
195 },
196 #[cfg(feature = "python")]
197 FileScan::PythonDataset { .. } => {
198 ScanSources::Paths(Arc::from(["dummy".into()]))
201 },
202 FileScan::Anonymous { .. } => sources,
203 };
204
205 let mut file_info = match &mut *scan_type {
206 #[cfg(feature = "parquet")]
207 FileScan::Parquet { options, metadata } => {
208 if let Some(schema) = &options.schema {
209 FileInfo {
212 schema: schema.clone(),
213 reader_schema: Some(either::Either::Left(Arc::new(
214 schema.to_arrow(CompatLevel::newest()),
215 ))),
216 row_estimation: (None, 0),
217 }
218 } else {
219 let (file_info, md) = scans::parquet_file_info(
220 &sources,
221 unified_scan_args.row_index.as_ref(),
222 cloud_options,
223 )
224 .map_err(|e| e.context(failed_here!(parquet scan)))?;
225
226 *metadata = md;
227 file_info
228 }
229 },
230 #[cfg(feature = "ipc")]
231 FileScan::Ipc { metadata, .. } => {
232 let (file_info, md) = scans::ipc_file_info(
233 &sources,
234 unified_scan_args.row_index.as_ref(),
235 cloud_options,
236 )
237 .map_err(|e| e.context(failed_here!(ipc scan)))?;
238 *metadata = Some(Arc::new(md));
239 file_info
240 },
241 #[cfg(feature = "csv")]
242 FileScan::Csv { options } => {
243 if options.schema.is_some() && options.has_header {
247 unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;
248 }
249
250 scans::csv_file_info(
251 &sources,
252 unified_scan_args.row_index.as_ref(),
253 options,
254 cloud_options,
255 )
256 .map_err(|e| e.context(failed_here!(csv scan)))?
257 },
258 #[cfg(feature = "json")]
259 FileScan::NDJson { options } => scans::ndjson_file_info(
260 &sources,
261 unified_scan_args.row_index.as_ref(),
262 options,
263 cloud_options,
264 )
265 .map_err(|e| e.context(failed_here!(ndjson scan)))?,
266 #[cfg(feature = "python")]
267 FileScan::PythonDataset { dataset_object, .. } => {
268 if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {
269 polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")
270 }
271
272 let mut schema = dataset_object.schema()?;
273 let reader_schema = schema.clone();
274
275 if let Some(row_index) = &unified_scan_args.row_index {
276 insert_row_index_to_schema(
277 Arc::make_mut(&mut schema),
278 row_index.name.clone(),
279 )?;
280 }
281
282 FileInfo {
283 schema,
284 reader_schema: Some(either::Either::Right(reader_schema)),
285 row_estimation: (None, usize::MAX),
286 }
287 },
288 FileScan::Anonymous { .. } => {
289 file_info.expect("FileInfo should be set for AnonymousScan")
290 },
291 };
292
293 if unified_scan_args.hive_options.enabled.is_none() {
294 unified_scan_args.hive_options.enabled = Some(false);
297 }
298
299 let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()
300 && file_info.reader_schema.is_some()
301 {
302 let paths = sources.as_paths().ok_or_else(|| {
303 polars_err!(nyi = "Hive-partitioning of in-memory buffers")
304 })?;
305
306 #[allow(unused_assignments)]
307 let mut owned = None;
308
309 hive_partitions_from_paths(
310 paths,
311 unified_scan_args.hive_options.hive_start_idx,
312 unified_scan_args.hive_options.schema.clone(),
313 match file_info.reader_schema.as_ref().unwrap() {
314 Either::Left(v) => {
315 owned = Some(Schema::from_arrow_schema(v.as_ref()));
316 owned.as_ref().unwrap()
317 },
318 Either::Right(v) => v.as_ref(),
319 },
320 unified_scan_args.hive_options.try_parse_dates,
321 )?
322 } else {
323 None
324 };
325
326 if let Some(ref hive_parts) = hive_parts {
327 let hive_schema = hive_parts.schema();
328 file_info.update_schema_with_hive_schema(hive_schema.clone());
329 } else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {
330 file_info.update_schema_with_hive_schema(hive_schema);
334 }
335
336 if let Some(ref file_path_col) = unified_scan_args.include_file_paths {
337 let schema = Arc::make_mut(&mut file_info.schema);
338
339 if schema.contains(file_path_col) {
340 polars_bail!(
341 Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,
342 file_path_col
343 );
344 }
345
346 schema.insert_at_index(
347 schema.len(),
348 file_path_col.clone(),
349 DataType::String,
350 )?;
351 }
352
353 unified_scan_args.projection = if file_info.reader_schema.is_some() {
354 maybe_init_projection_excluding_hive(
355 file_info.reader_schema.as_ref().unwrap(),
356 hive_parts.as_ref().map(|h| h.schema()),
357 )
358 } else {
359 None
360 };
361
362 if let Some(row_index) = &unified_scan_args.row_index {
363 let schema = Arc::make_mut(&mut file_info.schema);
364 *schema = schema
365 .new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)
366 .unwrap();
367 }
368
369 let ir = if sources.is_empty() && !matches!(&*scan_type, FileScan::Anonymous { .. })
370 {
371 IR::DataFrameScan {
372 df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),
373 schema: file_info.schema,
374 output_schema: None,
375 }
376 } else {
377 let unified_scan_args = unified_scan_args_box;
378
379 IR::Scan {
380 sources,
381 file_info,
382 hive_parts,
383 predicate: None,
384 scan_type,
385 output_schema: None,
386 unified_scan_args,
387 id: Default::default(),
388 }
389 };
390
391 cached_ir.replace(ir);
392 }
393
394 cached_ir.clone().unwrap()
395 },
396 #[cfg(feature = "python")]
397 DslPlan::PythonScan { mut options } => {
398 let scan_fn = options.scan_fn.take();
399 let schema = options.get_schema()?;
400 IR::PythonScan {
401 options: PythonOptions {
402 scan_fn,
403 schema,
404 python_source: options.python_source,
405 validate_schema: options.validate_schema,
406 output_schema: Default::default(),
407 with_columns: Default::default(),
408 n_rows: Default::default(),
409 predicate: Default::default(),
410 },
411 }
412 },
413 DslPlan::Union { inputs, args } => {
414 let mut inputs = inputs
415 .into_iter()
416 .map(|lp| to_alp_impl(lp, ctxt))
417 .collect::<PolarsResult<Vec<_>>>()
418 .map_err(|e| e.context(failed_here!(vertical concat)))?;
419
420 if args.diagonal {
421 inputs =
422 convert_utils::convert_diagonal_concat(inputs, ctxt.lp_arena, ctxt.expr_arena)?;
423 }
424
425 if args.to_supertypes {
426 convert_utils::convert_st_union(&mut inputs, ctxt.lp_arena, ctxt.expr_arena)
427 .map_err(|e| e.context(failed_here!(vertical concat)))?;
428 }
429
430 let first = *inputs.first().ok_or_else(
431 || polars_err!(InvalidOperation: "expected at least one input in 'union'/'concat'"),
432 )?;
433 let schema = ctxt.lp_arena.get(first).schema(ctxt.lp_arena);
434 for n in &inputs[1..] {
435 let schema_i = ctxt.lp_arena.get(*n).schema(ctxt.lp_arena);
436 schema_i.matches_schema(schema.as_ref()).map_err(|_| polars_err!(InvalidOperation: "'union'/'concat' inputs should all have the same schema,\
438 got\n{:?} and \n{:?}", schema, schema_i)
439 )?;
440 }
441
442 let options = args.into();
443 IR::Union { inputs, options }
444 },
445 DslPlan::HConcat { inputs, options } => {
446 let inputs = inputs
447 .into_iter()
448 .map(|lp| to_alp_impl(lp, ctxt))
449 .collect::<PolarsResult<Vec<_>>>()
450 .map_err(|e| e.context(failed_here!(horizontal concat)))?;
451
452 let schema = convert_utils::h_concat_schema(&inputs, ctxt.lp_arena)?;
453
454 IR::HConcat {
455 inputs,
456 schema,
457 options,
458 }
459 },
460 DslPlan::Filter { input, predicate } => {
461 let mut input =
462 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(filter)))?;
463 let schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
464 let predicate = expand_filter(predicate, input, ctxt.lp_arena, ctxt.opt_flags)
465 .map_err(|e| e.context(failed_here!(filter)))?;
466
467 let predicate_ae = to_expr_ir(predicate.clone(), ctxt.expr_arena, &schema)?;
468
469 if ctxt.opt_flags.predicate_pushdown() {
470 ctxt.nodes_scratch.clear();
471
472 if let Some(SplitPredicates { pushable, fallible }) = SplitPredicates::new(
473 predicate_ae.node(),
474 ctxt.expr_arena,
475 Some(ctxt.nodes_scratch),
476 ctxt.pushdown_maintain_errors,
477 ) {
478 let mut update_input = |predicate: Node| -> PolarsResult<()> {
479 let predicate = ExprIR::from_node(predicate, ctxt.expr_arena);
480 ctxt.conversion_optimizer
481 .push_scratch(predicate.node(), ctxt.expr_arena);
482 let lp = IR::Filter { input, predicate };
483 input = run_conversion(lp, ctxt, "filter")?;
484
485 Ok(())
486 };
487
488 for predicate in pushable {
491 update_input(predicate)?;
492 }
493
494 if let Some(node) = fallible {
495 update_input(node)?;
496 }
497
498 return Ok(input);
499 };
500 };
501
502 ctxt.conversion_optimizer
503 .push_scratch(predicate_ae.node(), ctxt.expr_arena);
504 let lp = IR::Filter {
505 input,
506 predicate: predicate_ae,
507 };
508 return run_conversion(lp, ctxt, "filter");
509 },
510 DslPlan::Slice { input, offset, len } => {
511 let input =
512 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(slice)))?;
513 IR::Slice { input, offset, len }
514 },
515 DslPlan::DataFrameScan { df, schema } => IR::DataFrameScan {
516 df,
517 schema,
518 output_schema: None,
519 },
520 DslPlan::Select {
521 expr,
522 input,
523 options,
524 } => {
525 let input =
526 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(select)))?;
527 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
528 let (exprs, schema) = prepare_projection(expr, &input_schema, ctxt.opt_flags)
529 .map_err(|e| e.context(failed_here!(select)))?;
530
531 if exprs.is_empty() {
532 ctxt.lp_arena.replace(input, empty_df());
533 return Ok(input);
534 }
535
536 let eirs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
537 ctxt.conversion_optimizer
538 .fill_scratch(&eirs, ctxt.expr_arena);
539
540 let schema = Arc::new(schema);
541 let lp = IR::Select {
542 expr: eirs,
543 input,
544 schema,
545 options,
546 };
547
548 return run_conversion(lp, ctxt, "select").map_err(|e| e.context(failed_here!(select)));
549 },
550 DslPlan::Sort {
551 input,
552 by_column,
553 slice,
554 mut sort_options,
555 } => {
556 let n_by_exprs = if by_column.len() == 1 {
558 match &by_column[0] {
559 Expr::Columns(cols) => cols.len(),
560 _ => 1,
561 }
562 } else {
563 by_column.len()
564 };
565 let n_desc = sort_options.descending.len();
566 polars_ensure!(
567 n_desc == n_by_exprs || n_desc == 1,
568 ComputeError: "the length of `descending` ({}) does not match the length of `by` ({})", n_desc, by_column.len()
569 );
570 let n_nulls_last = sort_options.nulls_last.len();
571 polars_ensure!(
572 n_nulls_last == n_by_exprs || n_nulls_last == 1,
573 ComputeError: "the length of `nulls_last` ({}) does not match the length of `by` ({})", n_nulls_last, by_column.len()
574 );
575
576 let input =
577 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sort)))?;
578
579 let mut expanded_cols = Vec::new();
580 let mut nulls_last = Vec::new();
581 let mut descending = Vec::new();
582
583 for (c, (&n, &d)) in by_column.into_iter().zip(
587 sort_options
588 .nulls_last
589 .iter()
590 .cycle()
591 .zip(sort_options.descending.iter().cycle()),
592 ) {
593 let exprs = expand_expressions(
594 input,
595 vec![c],
596 ctxt.lp_arena,
597 ctxt.expr_arena,
598 ctxt.opt_flags,
599 )
600 .map_err(|e| e.context(failed_here!(sort)))?;
601
602 nulls_last.extend(std::iter::repeat_n(n, exprs.len()));
603 descending.extend(std::iter::repeat_n(d, exprs.len()));
604 expanded_cols.extend(exprs);
605 }
606 sort_options.nulls_last = nulls_last;
607 sort_options.descending = descending;
608
609 ctxt.conversion_optimizer
610 .fill_scratch(&expanded_cols, ctxt.expr_arena);
611 let mut by_column = expanded_cols;
612
613 if by_column.len() > 1 {
615 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
616
617 let mut null_columns = vec![];
618
619 for (i, c) in by_column.iter().enumerate() {
620 if let DataType::Null =
621 c.dtype(&input_schema, Context::Default, ctxt.expr_arena)?
622 {
623 null_columns.push(i);
624 }
625 }
626 if null_columns.len() == by_column.len() {
628 by_column.truncate(1);
629 sort_options.nulls_last.truncate(1);
630 sort_options.descending.truncate(1);
631 }
632 else if !null_columns.is_empty() {
634 for i in null_columns.into_iter().rev() {
635 by_column.remove(i);
636 sort_options.nulls_last.remove(i);
637 sort_options.descending.remove(i);
638 }
639 }
640 };
641
642 let lp = IR::Sort {
643 input,
644 by_column,
645 slice,
646 sort_options,
647 };
648
649 return run_conversion(lp, ctxt, "sort").map_err(|e| e.context(failed_here!(sort)));
650 },
651 DslPlan::Cache { input } => {
652 let id = UniqueId::from_arc(input.clone());
653 let input =
654 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(cache)))?;
655 IR::Cache {
656 input,
657 id,
658 cache_hits: crate::constants::UNLIMITED_CACHE,
659 }
660 },
661 DslPlan::GroupBy {
662 input,
663 keys,
664 aggs,
665 apply,
666 maintain_order,
667 options,
668 } => {
669 let input =
670 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(group_by)))?;
671
672 if ctxt.opt_flags.eager() && options.is_rolling() && !keys.is_empty() {
674 ctxt.opt_flags.insert(OptFlags::PROJECTION_PUSHDOWN)
675 }
676
677 let (keys, aggs, schema) = resolve_group_by(
678 input,
679 keys,
680 aggs,
681 &options,
682 ctxt.lp_arena,
683 ctxt.expr_arena,
684 ctxt.opt_flags,
685 )
686 .map_err(|e| e.context(failed_here!(group_by)))?;
687
688 let (apply, schema) = if let Some((apply, schema)) = apply {
689 (Some(apply), schema)
690 } else {
691 (None, schema)
692 };
693
694 ctxt.conversion_optimizer
695 .fill_scratch(&keys, ctxt.expr_arena);
696 ctxt.conversion_optimizer
697 .fill_scratch(&aggs, ctxt.expr_arena);
698
699 let lp = IR::GroupBy {
700 input,
701 keys,
702 aggs,
703 schema,
704 apply,
705 maintain_order,
706 options,
707 };
708
709 return run_conversion(lp, ctxt, "group_by")
710 .map_err(|e| e.context(failed_here!(group_by)));
711 },
712 DslPlan::Join {
713 input_left,
714 input_right,
715 left_on,
716 right_on,
717 predicates,
718 options,
719 } => {
720 return join::resolve_join(
721 Either::Left(input_left),
722 Either::Left(input_right),
723 left_on,
724 right_on,
725 predicates,
726 JoinOptionsIR::from(Arc::unwrap_or_clone(options)),
727 ctxt,
728 )
729 .map_err(|e| e.context(failed_here!(join)))
730 .map(|t| t.0);
731 },
732 DslPlan::HStack {
733 input,
734 exprs,
735 options,
736 } => {
737 let input = to_alp_impl(owned(input), ctxt)
738 .map_err(|e| e.context(failed_here!(with_columns)))?;
739 let (exprs, schema) =
740 resolve_with_columns(exprs, input, ctxt.lp_arena, ctxt.expr_arena, ctxt.opt_flags)
741 .map_err(|e| e.context(failed_here!(with_columns)))?;
742
743 ctxt.conversion_optimizer
744 .fill_scratch(&exprs, ctxt.expr_arena);
745 let lp = IR::HStack {
746 input,
747 exprs,
748 schema,
749 options,
750 };
751 return run_conversion(lp, ctxt, "with_columns");
752 },
753 DslPlan::MatchToSchema {
754 input,
755 match_schema,
756 per_column,
757 extra_columns,
758 } => {
759 let input =
760 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
761 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
762
763 assert_eq!(per_column.len(), match_schema.len());
764
765 if input_schema.as_ref() == &match_schema {
766 return Ok(input);
767 }
768
769 let mut exprs = Vec::with_capacity(match_schema.len());
770 let mut found_missing_columns = Vec::new();
771 let mut used_input_columns = 0;
772
773 for ((column, dtype), per_column) in match_schema.iter().zip(per_column.iter()) {
774 match input_schema.get(column) {
775 None => match &per_column.missing_columns {
776 MissingColumnsPolicyOrExpr::Raise => found_missing_columns.push(column),
777 MissingColumnsPolicyOrExpr::Insert => exprs.push(Expr::Alias(
778 Arc::new(Expr::Literal(LiteralValue::Scalar(Scalar::null(
779 dtype.clone(),
780 )))),
781 column.clone(),
782 )),
783 MissingColumnsPolicyOrExpr::InsertWith(expr) => {
784 exprs.push(Expr::Alias(Arc::new(expr.clone()), column.clone()))
785 },
786 },
787 Some(input_dtype) if dtype == input_dtype => {
788 used_input_columns += 1;
789 exprs.push(Expr::Column(column.clone()))
790 },
791 Some(input_dtype) => {
792 let from_dtype = input_dtype;
793 let to_dtype = dtype;
794
795 let policy = CastColumnsPolicy {
796 integer_upcast: per_column.integer_cast == UpcastOrForbid::Upcast,
797 float_upcast: per_column.float_cast == UpcastOrForbid::Upcast,
798 float_downcast: false,
799 datetime_nanoseconds_downcast: false,
800 datetime_microseconds_downcast: false,
801 datetime_convert_timezone: false,
802 missing_struct_fields: per_column.missing_struct_fields,
803 extra_struct_fields: per_column.extra_struct_fields,
804 };
805
806 let should_cast =
807 policy.should_cast_column(column, to_dtype, from_dtype)?;
808
809 let mut expr = Expr::Column(PlSmallStr::from_str(column));
810 if should_cast {
811 expr = expr.cast_with_options(to_dtype.clone(), CastOptions::NonStrict);
812 }
813
814 used_input_columns += 1;
815 exprs.push(expr);
816 },
817 }
818 }
819
820 if let Some(lst) = found_missing_columns.first() {
822 use std::fmt::Write;
823 let mut formatted = String::new();
824 write!(&mut formatted, "\"{}\"", found_missing_columns[0]).unwrap();
825 for c in &found_missing_columns[1..] {
826 write!(&mut formatted, ", \"{c}\"").unwrap();
827 }
828
829 write!(&mut formatted, "\"{lst}\"").unwrap();
830 polars_bail!(SchemaMismatch: "missing columns in `match_to_schema`: {formatted}");
831 }
832
833 if used_input_columns != input_schema.len()
835 && extra_columns == ExtraColumnsPolicy::Raise
836 {
837 let found_extra_columns = input_schema
838 .iter_names()
839 .filter(|n| !match_schema.contains(n))
840 .collect::<Vec<_>>();
841
842 use std::fmt::Write;
843 let mut formatted = String::new();
844 write!(&mut formatted, "\"{}\"", found_extra_columns[0]).unwrap();
845 for c in &found_extra_columns[1..] {
846 write!(&mut formatted, ", \"{c}\"").unwrap();
847 }
848
849 polars_bail!(SchemaMismatch: "extra columns in `match_to_schema`: {formatted}");
850 }
851
852 let exprs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
853
854 ctxt.conversion_optimizer
855 .fill_scratch(&exprs, ctxt.expr_arena);
856 let lp = IR::Select {
857 input,
858 expr: exprs,
859 schema: match_schema.clone(),
860 options: ProjectionOptions {
861 run_parallel: true,
862 duplicate_check: false,
863 should_broadcast: true,
864 },
865 };
866 return run_conversion(lp, ctxt, "match_to_schema");
867 },
868 DslPlan::Distinct { input, options } => {
869 let input =
870 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(unique)))?;
871 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
872
873 let subset = options
874 .subset
875 .map(|s| {
876 let cols = expand_selectors(s, input_schema.as_ref(), &[])?;
877
878 for col in cols.iter() {
880 let _ = input_schema
881 .try_get(col)
882 .map_err(|_| polars_err!(col_not_found = col))?;
883 }
884
885 Ok::<_, PolarsError>(cols)
886 })
887 .transpose()?;
888
889 let options = DistinctOptionsIR {
890 subset,
891 maintain_order: options.maintain_order,
892 keep_strategy: options.keep_strategy,
893 slice: None,
894 };
895
896 IR::Distinct { input, options }
897 },
898 DslPlan::MapFunction { input, function } => {
899 let input = to_alp_impl(owned(input), ctxt)
900 .map_err(|e| e.context(failed_here!(format!("{}", function).to_lowercase())))?;
901 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
902
903 match function {
904 DslFunction::Explode {
905 columns,
906 allow_empty,
907 } => {
908 let columns = expand_selectors(columns, &input_schema, &[])?;
909 validate_columns_in_input(columns.as_ref(), &input_schema, "explode")?;
910 polars_ensure!(!columns.is_empty() || allow_empty, InvalidOperation: "no columns provided in explode");
911 if columns.is_empty() {
912 return Ok(input);
913 }
914 let function = FunctionIR::Explode {
915 columns,
916 schema: Default::default(),
917 };
918 let ir = IR::MapFunction { input, function };
919 return Ok(ctxt.lp_arena.add(ir));
920 },
921 DslFunction::FillNan(fill_value) => {
922 let exprs = input_schema
923 .iter()
924 .filter_map(|(name, dtype)| match dtype {
925 DataType::Float32 | DataType::Float64 => Some(
926 col(name.clone())
927 .fill_nan(fill_value.clone())
928 .alias(name.clone()),
929 ),
930 _ => None,
931 })
932 .collect::<Vec<_>>();
933
934 let (exprs, schema) = resolve_with_columns(
935 exprs,
936 input,
937 ctxt.lp_arena,
938 ctxt.expr_arena,
939 ctxt.opt_flags,
940 )
941 .map_err(|e| e.context(failed_here!(fill_nan)))?;
942
943 ctxt.conversion_optimizer
944 .fill_scratch(&exprs, ctxt.expr_arena);
945
946 let lp = IR::HStack {
947 input,
948 exprs,
949 schema,
950 options: ProjectionOptions {
951 duplicate_check: false,
952 ..Default::default()
953 },
954 };
955 return run_conversion(lp, ctxt, "fill_nan");
956 },
957 DslFunction::Drop(DropFunction { to_drop, strict }) => {
958 let to_drop = expand_selectors(to_drop, &input_schema, &[])?;
959 let to_drop = to_drop.iter().map(|s| s.as_ref()).collect::<PlHashSet<_>>();
960
961 if strict {
962 for col_name in to_drop.iter() {
963 polars_ensure!(
964 input_schema.contains(col_name),
965 col_not_found = col_name
966 );
967 }
968 }
969
970 let mut output_schema =
971 Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len()));
972
973 for (col_name, dtype) in input_schema.iter() {
974 if !to_drop.contains(col_name.as_str()) {
975 output_schema.with_column(col_name.clone(), dtype.clone());
976 }
977 }
978
979 if output_schema.is_empty() {
980 ctxt.lp_arena.replace(input, empty_df());
981 }
982
983 IR::SimpleProjection {
984 input,
985 columns: Arc::new(output_schema),
986 }
987 },
988 DslFunction::Stats(sf) => {
989 let exprs = match sf {
990 StatsFunction::Var { ddof } => stats_helper(
991 |dt| dt.is_primitive_numeric() || dt.is_bool(),
992 |name| col(name.clone()).var(ddof),
993 &input_schema,
994 ),
995 StatsFunction::Std { ddof } => stats_helper(
996 |dt| dt.is_primitive_numeric() || dt.is_bool(),
997 |name| col(name.clone()).std(ddof),
998 &input_schema,
999 ),
1000 StatsFunction::Quantile { quantile, method } => stats_helper(
1001 |dt| dt.is_primitive_numeric(),
1002 |name| col(name.clone()).quantile(quantile.clone(), method),
1003 &input_schema,
1004 ),
1005 StatsFunction::Mean => stats_helper(
1006 |dt| {
1007 dt.is_primitive_numeric()
1008 || dt.is_temporal()
1009 || dt == &DataType::Boolean
1010 },
1011 |name| col(name.clone()).mean(),
1012 &input_schema,
1013 ),
1014 StatsFunction::Sum => stats_helper(
1015 |dt| {
1016 dt.is_primitive_numeric()
1017 || dt.is_decimal()
1018 || matches!(dt, DataType::Boolean | DataType::Duration(_))
1019 },
1020 |name| col(name.clone()).sum(),
1021 &input_schema,
1022 ),
1023 StatsFunction::Min => stats_helper(
1024 |dt| dt.is_ord(),
1025 |name| col(name.clone()).min(),
1026 &input_schema,
1027 ),
1028 StatsFunction::Max => stats_helper(
1029 |dt| dt.is_ord(),
1030 |name| col(name.clone()).max(),
1031 &input_schema,
1032 ),
1033 StatsFunction::Median => stats_helper(
1034 |dt| {
1035 dt.is_primitive_numeric()
1036 || dt.is_temporal()
1037 || dt == &DataType::Boolean
1038 },
1039 |name| col(name.clone()).median(),
1040 &input_schema,
1041 ),
1042 };
1043 let schema = Arc::new(expressions_to_schema(
1044 &exprs,
1045 &input_schema,
1046 Context::Default,
1047 )?);
1048 let eirs = to_expr_irs(exprs, ctxt.expr_arena, &input_schema)?;
1049
1050 ctxt.conversion_optimizer
1051 .fill_scratch(&eirs, ctxt.expr_arena);
1052
1053 let lp = IR::Select {
1054 input,
1055 expr: eirs,
1056 schema,
1057 options: ProjectionOptions {
1058 duplicate_check: false,
1059 ..Default::default()
1060 },
1061 };
1062 return run_conversion(lp, ctxt, "stats");
1063 },
1064 DslFunction::Rename {
1065 existing,
1066 new,
1067 strict,
1068 } => {
1069 assert_eq!(existing.len(), new.len());
1070 if existing.is_empty() {
1071 return Ok(input);
1072 }
1073
1074 let existing_lut =
1075 PlIndexSet::from_iter(existing.iter().map(PlSmallStr::as_str));
1076
1077 let mut schema = Schema::with_capacity(input_schema.len());
1078 let mut num_replaced = 0;
1079
1080 let expr = input_schema
1082 .iter()
1083 .map(|(n, dtype)| {
1084 Ok(match existing_lut.get_index_of(n.as_str()) {
1085 None => {
1086 schema.try_insert(n.clone(), dtype.clone())?;
1087 Expr::Column(n.clone())
1088 },
1089 Some(i) => {
1090 num_replaced += 1;
1091 schema.try_insert(new[i].clone(), dtype.clone())?;
1092 Expr::Column(n.clone()).alias(new[i].clone())
1093 },
1094 })
1095 })
1096 .collect::<PolarsResult<Vec<_>>>()?;
1097
1098 if strict && num_replaced != existing.len() {
1099 let col = existing.iter().find(|c| !input_schema.contains(c)).unwrap();
1100 polars_bail!(col_not_found = col);
1101 }
1102
1103 if num_replaced == 0 {
1105 return Ok(input);
1106 }
1107
1108 let expr = to_expr_irs(expr, ctxt.expr_arena, &input_schema)?;
1109 ctxt.conversion_optimizer
1110 .fill_scratch(&expr, ctxt.expr_arena);
1111
1112 IR::Select {
1113 input,
1114 expr,
1115 schema: Arc::new(schema),
1116 options: ProjectionOptions {
1117 run_parallel: false,
1118 duplicate_check: false,
1119 should_broadcast: false,
1120 },
1121 }
1122 },
1123 _ => {
1124 let function = function.into_function_ir(&input_schema)?;
1125 IR::MapFunction { input, function }
1126 },
1127 }
1128 },
1129 DslPlan::ExtContext { input, contexts } => {
1130 let input = to_alp_impl(owned(input), ctxt)
1131 .map_err(|e| e.context(failed_here!(with_context)))?;
1132 let contexts = contexts
1133 .into_iter()
1134 .map(|lp| to_alp_impl(lp, ctxt))
1135 .collect::<PolarsResult<Vec<_>>>()
1136 .map_err(|e| e.context(failed_here!(with_context)))?;
1137
1138 let mut schema = (**ctxt.lp_arena.get(input).schema(ctxt.lp_arena)).clone();
1139 for input in &contexts {
1140 let other_schema = ctxt.lp_arena.get(*input).schema(ctxt.lp_arena);
1141 for fld in other_schema.iter_fields() {
1142 if schema.get(fld.name()).is_none() {
1143 schema.with_column(fld.name, fld.dtype);
1144 }
1145 }
1146 }
1147
1148 IR::ExtContext {
1149 input,
1150 contexts,
1151 schema: Arc::new(schema),
1152 }
1153 },
1154 DslPlan::Sink { input, payload } => {
1155 let input =
1156 to_alp_impl(owned(input), ctxt).map_err(|e| e.context(failed_here!(sink)))?;
1157 let input_schema = ctxt.lp_arena.get(input).schema(ctxt.lp_arena);
1158 let payload = match payload {
1159 SinkType::Memory => SinkTypeIR::Memory,
1160 SinkType::File(f) => SinkTypeIR::File(f),
1161 SinkType::Partition(f) => SinkTypeIR::Partition(PartitionSinkTypeIR {
1162 base_path: f.base_path,
1163 file_path_cb: f.file_path_cb,
1164 file_type: f.file_type,
1165 sink_options: f.sink_options,
1166 variant: match f.variant {
1167 PartitionVariant::MaxSize(max_size) => {
1168 PartitionVariantIR::MaxSize(max_size)
1169 },
1170 PartitionVariant::Parted {
1171 key_exprs,
1172 include_key,
1173 } => {
1174 let eirs = to_expr_irs(key_exprs, ctxt.expr_arena, &input_schema)?;
1175 ctxt.conversion_optimizer
1176 .fill_scratch(&eirs, ctxt.expr_arena);
1177
1178 PartitionVariantIR::Parted {
1179 key_exprs: eirs,
1180 include_key,
1181 }
1182 },
1183 PartitionVariant::ByKey {
1184 key_exprs,
1185 include_key,
1186 } => {
1187 let eirs = to_expr_irs(key_exprs, ctxt.expr_arena, &input_schema)?;
1188 ctxt.conversion_optimizer
1189 .fill_scratch(&eirs, ctxt.expr_arena);
1190
1191 PartitionVariantIR::ByKey {
1192 key_exprs: eirs,
1193 include_key,
1194 }
1195 },
1196 },
1197 cloud_options: f.cloud_options,
1198 per_partition_sort_by: match f.per_partition_sort_by {
1199 None => None,
1200 Some(sort_by) => Some(
1201 sort_by
1202 .into_iter()
1203 .map(|s| {
1204 let expr = to_expr_ir(s.expr, ctxt.expr_arena, &input_schema)?;
1205 ctxt.conversion_optimizer
1206 .push_scratch(expr.node(), ctxt.expr_arena);
1207 Ok(SortColumnIR {
1208 expr,
1209 descending: s.descending,
1210 nulls_last: s.nulls_last,
1211 })
1212 })
1213 .collect::<PolarsResult<Vec<_>>>()?,
1214 ),
1215 },
1216 finish_callback: f.finish_callback,
1217 }),
1218 };
1219
1220 let lp = IR::Sink { input, payload };
1221 return run_conversion(lp, ctxt, "sink");
1222 },
1223 DslPlan::SinkMultiple { inputs } => {
1224 let inputs = inputs
1225 .into_iter()
1226 .map(|lp| to_alp_impl(lp, ctxt))
1227 .collect::<PolarsResult<Vec<_>>>()
1228 .map_err(|e| e.context(failed_here!(vertical concat)))?;
1229 IR::SinkMultiple { inputs }
1230 },
1231 #[cfg(feature = "merge_sorted")]
1232 DslPlan::MergeSorted {
1233 input_left,
1234 input_right,
1235 key,
1236 } => {
1237 let input_left = to_alp_impl(owned(input_left), ctxt)
1238 .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1239 let input_right = to_alp_impl(owned(input_right), ctxt)
1240 .map_err(|e| e.context(failed_here!(merge_sorted)))?;
1241
1242 IR::MergeSorted {
1243 input_left,
1244 input_right,
1245 key,
1246 }
1247 },
1248 DslPlan::IR { node, dsl, version } => {
1249 return if node.is_some()
1250 && version == ctxt.lp_arena.version()
1251 && ctxt.conversion_optimizer.used_arenas.insert(version)
1252 {
1253 Ok(node.unwrap())
1254 } else {
1255 to_alp_impl(owned(dsl), ctxt)
1256 };
1257 },
1258 };
1259 Ok(ctxt.lp_arena.add(v))
1260}
1261
1262fn expand_filter(
1263 predicate: Expr,
1264 input: Node,
1265 lp_arena: &Arena<IR>,
1266 opt_flags: &mut OptFlags,
1267) -> PolarsResult<Expr> {
1268 let schema = lp_arena.get(input).schema(lp_arena);
1269 let predicate = if has_expr(&predicate, |e| match e {
1270 Expr::Column(name) => is_regex_projection(name),
1271 Expr::Wildcard
1272 | Expr::Selector(_)
1273 | Expr::RenameAlias { .. }
1274 | Expr::Columns(_)
1275 | Expr::DtypeColumn(_)
1276 | Expr::IndexColumn(_)
1277 | Expr::Nth(_) => true,
1278 #[cfg(feature = "dtype-struct")]
1279 Expr::Function {
1280 function: FunctionExpr::StructExpr(StructFunction::FieldByIndex(_)),
1281 ..
1282 } => true,
1283 _ => false,
1284 }) {
1285 let mut rewritten = rewrite_projections(vec![predicate], &schema, &[], opt_flags)?;
1286 match rewritten.len() {
1287 1 => {
1288 rewritten.pop().unwrap()
1290 },
1291 0 => {
1292 let msg = "The predicate expanded to zero expressions. \
1293 This may for example be caused by a regex not matching column names or \
1294 a column dtype match not hitting any dtypes in the DataFrame";
1295 polars_bail!(ComputeError: msg);
1296 },
1297 _ => {
1298 let mut expanded = String::new();
1299 for e in rewritten.iter().take(5) {
1300 expanded.push_str(&format!("\t{e:?},\n"))
1301 }
1302 expanded.pop();
1304 if rewritten.len() > 5 {
1305 expanded.push_str("\t...\n")
1306 }
1307
1308 let msg = if cfg!(feature = "python") {
1309 format!(
1310 "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
1311 This is ambiguous. Try to combine the predicates with the 'all' or `any' expression."
1312 )
1313 } else {
1314 format!(
1315 "The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\
1316 This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression."
1317 )
1318 };
1319 polars_bail!(ComputeError: msg)
1320 },
1321 }
1322 } else {
1323 predicate
1324 };
1325 expr_to_leaf_column_names_iter(&predicate)
1326 .try_for_each(|c| schema.try_index_of(&c).and(Ok(())))?;
1327
1328 Ok(predicate)
1329}
1330
1331fn resolve_with_columns(
1332 exprs: Vec<Expr>,
1333 input: Node,
1334 lp_arena: &Arena<IR>,
1335 expr_arena: &mut Arena<AExpr>,
1336 opt_flags: &mut OptFlags,
1337) -> PolarsResult<(Vec<ExprIR>, SchemaRef)> {
1338 let input_schema = lp_arena.get(input).schema(lp_arena);
1339 let mut output_schema = (**input_schema).clone();
1340 let (exprs, _) = prepare_projection(exprs, &input_schema, opt_flags)?;
1341 let mut output_names = PlHashSet::with_capacity(exprs.len());
1342
1343 let mut arena = Arena::with_capacity(8);
1344 for e in &exprs {
1345 let field = e
1346 .to_field_amortized(&input_schema, Context::Default, &mut arena)
1347 .unwrap();
1348
1349 if !output_names.insert(field.name().clone()) {
1350 let msg = format!(
1351 "the name '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\
1352 It's possible that multiple expressions are returning the same default column name. \
1353 If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \
1354 duplicate column names.",
1355 field.name()
1356 );
1357 polars_bail!(ComputeError: msg)
1358 }
1359 output_schema.with_column(field.name, field.dtype.materialize_unknown(true)?);
1360 arena.clear();
1361 }
1362
1363 let eirs = to_expr_irs(exprs, expr_arena, &input_schema)?;
1364 Ok((eirs, Arc::new(output_schema)))
1365}
1366
1367fn resolve_group_by(
1368 input: Node,
1369 keys: Vec<Expr>,
1370 aggs: Vec<Expr>,
1371 _options: &GroupbyOptions,
1372 lp_arena: &Arena<IR>,
1373 expr_arena: &mut Arena<AExpr>,
1374 opt_flags: &mut OptFlags,
1375) -> PolarsResult<(Vec<ExprIR>, Vec<ExprIR>, SchemaRef)> {
1376 let input_schema = lp_arena.get(input).schema(lp_arena);
1377 let input_schema = input_schema.as_ref();
1378 let mut keys = rewrite_projections(keys, input_schema, &[], opt_flags)?;
1379
1380 let mut output_schema = expressions_to_schema(&keys, input_schema, Context::Default)?;
1382
1383 #[allow(unused_mut)]
1384 let mut pop_keys = false;
1385 #[cfg(feature = "dynamic_group_by")]
1388 {
1389 if let Some(options) = _options.rolling.as_ref() {
1390 let name = options.index_column.clone();
1391 let dtype = input_schema.try_get(name.as_str())?;
1392 keys.push(col(name.clone()));
1393 pop_keys = true;
1394 output_schema.with_column(name.clone(), dtype.clone());
1395 } else if let Some(options) = _options.dynamic.as_ref() {
1396 let name = options.index_column.clone();
1397 keys.push(col(name.clone()));
1398 pop_keys = true;
1399 let dtype = input_schema.try_get(name.as_str())?;
1400 if options.include_boundaries {
1401 output_schema.with_column("_lower_boundary".into(), dtype.clone());
1402 output_schema.with_column("_upper_boundary".into(), dtype.clone());
1403 }
1404 output_schema.with_column(name.clone(), dtype.clone());
1405 }
1406 }
1407 let keys_index_len = output_schema.len();
1408
1409 let aggs = rewrite_projections(aggs, input_schema, &keys, opt_flags)?;
1410 if pop_keys {
1411 let _ = keys.pop();
1412 }
1413
1414 let aggs_schema = expressions_to_schema(&aggs, input_schema, Context::Aggregation)?;
1416 output_schema.merge(aggs_schema);
1417
1418 if output_schema.len() < (keys_index_len + aggs.len()) {
1420 let mut names = PlHashSet::with_capacity(output_schema.len());
1421 for expr in aggs.iter().chain(keys.iter()) {
1422 let name = expr_output_name(expr)?;
1423 polars_ensure!(names.insert(name.clone()), duplicate = name)
1424 }
1425 }
1426 let keys = to_expr_irs(keys, expr_arena, input_schema)?;
1427 let aggs = to_expr_irs(aggs, expr_arena, input_schema)?;
1428 validate_expressions(&keys, expr_arena, input_schema, "group by")?;
1429 validate_expressions(&aggs, expr_arena, input_schema, "group by")?;
1430
1431 Ok((keys, aggs, Arc::new(output_schema)))
1432}
1433fn stats_helper<F, E>(condition: F, expr: E, schema: &Schema) -> Vec<Expr>
1434where
1435 F: Fn(&DataType) -> bool,
1436 E: Fn(&PlSmallStr) -> Expr,
1437{
1438 schema
1439 .iter()
1440 .map(|(name, dt)| {
1441 if condition(dt) {
1442 expr(name)
1443 } else {
1444 lit(NULL).cast(dt.clone()).alias(name.clone())
1445 }
1446 })
1447 .collect()
1448}
1449
1450pub(crate) fn maybe_init_projection_excluding_hive(
1451 reader_schema: &Either<ArrowSchemaRef, SchemaRef>,
1452 hive_parts: Option<&SchemaRef>,
1453) -> Option<Arc<[PlSmallStr]>> {
1454 let hive_schema = hive_parts?;
1457
1458 match &reader_schema {
1459 Either::Left(reader_schema) => hive_schema
1460 .iter_names()
1461 .any(|x| reader_schema.contains(x))
1462 .then(|| {
1463 reader_schema
1464 .iter_names_cloned()
1465 .filter(|x| !hive_schema.contains(x))
1466 .collect::<Arc<[_]>>()
1467 }),
1468 Either::Right(reader_schema) => hive_schema
1469 .iter_names()
1470 .any(|x| reader_schema.contains(x))
1471 .then(|| {
1472 reader_schema
1473 .iter_names_cloned()
1474 .filter(|x| !hive_schema.contains(x))
1475 .collect::<Arc<[_]>>()
1476 }),
1477 }
1478}