1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::global::_set_n_rows_for_scan;
5use polars_plan::plans::expr_ir::ExprIR;
6use polars_utils::format_pl_smallstr;
7use recursive::recursive;
8
9use self::expr_ir::OutputName;
10use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11#[cfg(feature = "python")]
12use self::python_dsl::PythonScanSource;
13use super::super::executors::{self, Executor};
14use super::*;
15use crate::ScanPredicate;
16use crate::executors::{CachePrefiller, SinkExecutor};
17use crate::predicate::PhysicalColumnPredicates;
18
19pub type StreamingExecutorBuilder =
20 fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
21
22fn partitionable_gb(
23 keys: &[ExprIR],
24 aggs: &[ExprIR],
25 input_schema: &Schema,
26 expr_arena: &Arena<AExpr>,
27 apply: &Option<Arc<dyn DataFrameUdf>>,
28) -> bool {
29 if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
36 for key in keys {
39 if (expr_arena).iter(key.node()).count() > 1
40 || has_aexpr(key.node(), expr_arena, |ae| match ae {
41 AExpr::Literal(lv) => !lv.is_scalar(),
42 _ => false,
43 })
44 {
45 return false;
46 }
47 }
48
49 can_pre_agg_exprs(aggs, expr_arena, input_schema)
50 } else {
51 false
52 }
53}
54
55#[derive(Clone)]
56struct ConversionState {
57 has_cache_child: bool,
58 has_cache_parent: bool,
59}
60
61impl ConversionState {
62 fn new() -> PolarsResult<Self> {
63 Ok(ConversionState {
64 has_cache_child: false,
65 has_cache_parent: false,
66 })
67 }
68
69 fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
70 let mut new_state = self.clone();
71 new_state.has_cache_child = false;
72 let out = func(&mut new_state);
73 self.has_cache_child = new_state.has_cache_child;
74 out
75 }
76}
77
78pub fn create_physical_plan(
79 root: Node,
80 lp_arena: &mut Arena<IR>,
81 expr_arena: &mut Arena<AExpr>,
82 build_streaming_executor: Option<StreamingExecutorBuilder>,
83) -> PolarsResult<Box<dyn Executor>> {
84 let mut state = ConversionState::new()?;
85 let mut cache_nodes = Default::default();
86 let plan = create_physical_plan_impl(
87 root,
88 lp_arena,
89 expr_arena,
90 &mut state,
91 &mut cache_nodes,
92 build_streaming_executor,
93 )?;
94
95 if cache_nodes.is_empty() {
96 Ok(plan)
97 } else {
98 Ok(Box::new(CachePrefiller {
99 caches: cache_nodes,
100 phys_plan: plan,
101 }))
102 }
103}
104
105pub struct MultiplePhysicalPlans {
106 pub cache_prefiller: Option<Box<dyn Executor>>,
107 pub physical_plans: Vec<Box<dyn Executor>>,
108}
109pub fn create_multiple_physical_plans(
110 roots: &[Node],
111 lp_arena: &mut Arena<IR>,
112 expr_arena: &mut Arena<AExpr>,
113 build_streaming_executor: Option<StreamingExecutorBuilder>,
114) -> PolarsResult<MultiplePhysicalPlans> {
115 let mut state = ConversionState::new()?;
116 let mut cache_nodes = Default::default();
117 let plans = state.with_new_branch(|new_state| {
118 roots
119 .iter()
120 .map(|&node| {
121 create_physical_plan_impl(
122 node,
123 lp_arena,
124 expr_arena,
125 new_state,
126 &mut cache_nodes,
127 build_streaming_executor,
128 )
129 })
130 .collect::<PolarsResult<Vec<_>>>()
131 })?;
132
133 let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
134 struct Empty;
135 impl Executor for Empty {
136 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
137 Ok(DataFrame::empty())
138 }
139 }
140 Box::new(CachePrefiller {
141 caches: cache_nodes,
142 phys_plan: Box::new(Empty),
143 }) as _
144 });
145
146 Ok(MultiplePhysicalPlans {
147 cache_prefiller,
148 physical_plans: plans,
149 })
150}
151
152#[cfg(feature = "python")]
153#[allow(clippy::type_complexity)]
154pub fn python_scan_predicate(
155 options: &mut PythonOptions,
156 expr_arena: &Arena<AExpr>,
157 state: &mut ExpressionConversionState,
158) -> PolarsResult<(
159 Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
160 Option<Vec<u8>>,
161)> {
162 let mut predicate_serialized = None;
163 let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
164 if matches!(options.python_source, PythonScanSource::Pyarrow) {
166 if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
167 e.node(),
168 expr_arena,
169 Default::default(),
170 ) {
171 options.predicate = PythonPredicate::PyArrow(eval_str);
172 None
174 } else {
175 Some(create_physical_expr(
176 e,
177 Context::Default,
178 expr_arena,
179 &options.schema,
180 state,
181 )?)
182 }
183 }
184 else {
186 let dsl_expr = e.to_expr(expr_arena);
187 predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
188
189 Some(create_physical_expr(
190 e,
191 Context::Default,
192 expr_arena,
193 &options.schema,
194 state,
195 )?)
196 }
197 } else {
198 None
199 };
200
201 Ok((predicate, predicate_serialized))
202}
203
204#[recursive]
205fn create_physical_plan_impl(
206 root: Node,
207 lp_arena: &mut Arena<IR>,
208 expr_arena: &mut Arena<AExpr>,
209 state: &mut ConversionState,
210 cache_nodes: &mut PlIndexMap<usize, Box<dyn Executor>>,
212 build_streaming_executor: Option<StreamingExecutorBuilder>,
213) -> PolarsResult<Box<dyn Executor>> {
214 use IR::*;
215
216 macro_rules! recurse {
217 ($node:expr, $state: expr) => {
218 create_physical_plan_impl(
219 $node,
220 lp_arena,
221 expr_arena,
222 $state,
223 cache_nodes,
224 build_streaming_executor,
225 )
226 };
227 }
228
229 let logical_plan = if state.has_cache_parent || matches!(lp_arena.get(root), IR::Scan { .. }) {
230 lp_arena.get(root).clone()
231 } else {
232 lp_arena.take(root)
233 };
234
235 match logical_plan {
236 #[cfg(feature = "python")]
237 PythonScan { mut options } => {
238 let mut expr_conv_state = ExpressionConversionState::new(true);
239 let (predicate, predicate_serialized) =
240 python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
241 Ok(Box::new(executors::PythonScanExec {
242 options,
243 predicate,
244 predicate_serialized,
245 }))
246 },
247 Sink { input, payload } => {
248 let input = recurse!(input, state)?;
249 match payload {
250 SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
251 input,
252 name: "mem".to_string(),
253 f: Box::new(move |df, _state| Ok(Some(df))),
254 })),
255 SinkTypeIR::File(FileSinkType {
256 file_type,
257 target,
258 sink_options,
259 cloud_options,
260 }) => {
261 let name: &'static str = match &file_type {
262 #[cfg(feature = "parquet")]
263 FileType::Parquet(_) => "parquet",
264 #[cfg(feature = "ipc")]
265 FileType::Ipc(_) => "ipc",
266 #[cfg(feature = "csv")]
267 FileType::Csv(_) => "csv",
268 #[cfg(feature = "json")]
269 FileType::Json(_) => "json",
270 #[allow(unreachable_patterns)]
271 _ => panic!("enable filetype feature"),
272 };
273
274 Ok(Box::new(SinkExecutor {
275 input,
276 name: name.to_string(),
277 f: Box::new(move |mut df, _state| {
278 let mut file = target
279 .open_into_writeable(&sink_options, cloud_options.as_ref())?;
280 let writer = &mut *file;
281
282 use std::io::BufWriter;
283 match &file_type {
284 #[cfg(feature = "parquet")]
285 FileType::Parquet(options) => {
286 use polars_io::parquet::write::ParquetWriter;
287 ParquetWriter::new(BufWriter::new(writer))
288 .with_compression(options.compression)
289 .with_statistics(options.statistics)
290 .with_row_group_size(options.row_group_size)
291 .with_data_page_size(options.data_page_size)
292 .finish(&mut df)?;
293 },
294 #[cfg(feature = "ipc")]
295 FileType::Ipc(options) => {
296 use polars_io::SerWriter;
297 use polars_io::ipc::IpcWriter;
298 IpcWriter::new(BufWriter::new(writer))
299 .with_compression(options.compression)
300 .with_compat_level(options.compat_level)
301 .finish(&mut df)?;
302 },
303 #[cfg(feature = "csv")]
304 FileType::Csv(options) => {
305 use polars_io::SerWriter;
306 use polars_io::csv::write::CsvWriter;
307 CsvWriter::new(BufWriter::new(writer))
308 .include_bom(options.include_bom)
309 .include_header(options.include_header)
310 .with_separator(options.serialize_options.separator)
311 .with_line_terminator(
312 options.serialize_options.line_terminator.clone(),
313 )
314 .with_quote_char(options.serialize_options.quote_char)
315 .with_batch_size(options.batch_size)
316 .with_datetime_format(
317 options.serialize_options.datetime_format.clone(),
318 )
319 .with_date_format(
320 options.serialize_options.date_format.clone(),
321 )
322 .with_time_format(
323 options.serialize_options.time_format.clone(),
324 )
325 .with_float_scientific(
326 options.serialize_options.float_scientific,
327 )
328 .with_float_precision(
329 options.serialize_options.float_precision,
330 )
331 .with_null_value(options.serialize_options.null.clone())
332 .with_quote_style(options.serialize_options.quote_style)
333 .finish(&mut df)?;
334 },
335 #[cfg(feature = "json")]
336 FileType::Json(_options) => {
337 use polars_io::SerWriter;
338 use polars_io::json::{JsonFormat, JsonWriter};
339
340 JsonWriter::new(BufWriter::new(writer))
341 .with_json_format(JsonFormat::JsonLines)
342 .finish(&mut df)?;
343 },
344 #[allow(unreachable_patterns)]
345 _ => panic!("enable filetype feature"),
346 }
347
348 file.sync_on_close(sink_options.sync_on_close)?;
349 file.close()?;
350
351 Ok(None)
352 }),
353 }))
354 },
355
356 SinkTypeIR::Partition { .. } => {
357 polars_bail!(InvalidOperation:
358 "partition sinks not yet supported in standard engine."
359 )
360 },
361 }
362 },
363 SinkMultiple { .. } => {
364 unreachable!("should be handled with create_multiple_physical_plans")
365 },
366 Union { inputs, options } => {
367 let inputs = state.with_new_branch(|new_state| {
368 inputs
369 .into_iter()
370 .map(|node| recurse!(node, new_state))
371 .collect::<PolarsResult<Vec<_>>>()
372 });
373 let inputs = inputs?;
374 Ok(Box::new(executors::UnionExec { inputs, options }))
375 },
376 HConcat {
377 inputs, options, ..
378 } => {
379 let inputs = state.with_new_branch(|new_state| {
380 inputs
381 .into_iter()
382 .map(|node| recurse!(node, new_state))
383 .collect::<PolarsResult<Vec<_>>>()
384 });
385
386 let inputs = inputs?;
387
388 Ok(Box::new(executors::HConcatExec { inputs, options }))
389 },
390 Slice { input, offset, len } => {
391 let input = recurse!(input, state)?;
392 Ok(Box::new(executors::SliceExec { input, offset, len }))
393 },
394 Filter { input, predicate } => {
395 let mut streamable =
396 is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
397 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
398 if streamable {
399 streamable = !input_schema
401 .iter_values()
402 .any(|dt| dt.contains_categoricals())
403 || {
404 #[cfg(feature = "dtype-categorical")]
405 {
406 polars_core::using_string_cache()
407 }
408
409 #[cfg(not(feature = "dtype-categorical"))]
410 {
411 false
412 }
413 }
414 }
415 let input = recurse!(input, state)?;
416 let mut state = ExpressionConversionState::new(true);
417 let predicate = create_physical_expr(
418 &predicate,
419 Context::Default,
420 expr_arena,
421 &input_schema,
422 &mut state,
423 )?;
424 Ok(Box::new(executors::FilterExec::new(
425 predicate,
426 input,
427 state.has_windows,
428 streamable,
429 )))
430 },
431 #[allow(unused_variables)]
432 Scan {
433 sources,
434 file_info,
435 hive_parts,
436 output_schema,
437 scan_type,
438 predicate,
439 mut unified_scan_args,
440 } => {
441 unified_scan_args.pre_slice = if let Some(mut slice) = unified_scan_args.pre_slice {
442 *slice.len_mut() = _set_n_rows_for_scan(Some(slice.len())).unwrap();
443 Some(slice)
444 } else {
445 _set_n_rows_for_scan(None)
446 .map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len })
447 };
448
449 let mut state = ExpressionConversionState::new(true);
450
451 let mut create_skip_batch_predicate = false;
452 #[cfg(feature = "parquet")]
453 {
454 create_skip_batch_predicate |= matches!(
455 &*scan_type,
456 FileScan::Parquet {
457 options: polars_io::prelude::ParquetOptions {
458 use_statistics: true,
459 ..
460 },
461 ..
462 }
463 );
464 }
465
466 let predicate = predicate
467 .map(|predicate| {
468 create_scan_predicate(
469 &predicate,
470 expr_arena,
471 output_schema.as_ref().unwrap_or(&file_info.schema),
472 &mut state,
473 create_skip_batch_predicate,
474 false,
475 )
476 })
477 .transpose()?;
478
479 match *scan_type {
480 FileScan::Anonymous { function, .. } => {
481 Ok(Box::new(executors::AnonymousScanExec {
482 function,
483 predicate,
484 unified_scan_args,
485 file_info,
486 output_schema,
487 predicate_has_windows: state.has_windows,
488 }))
489 },
490 #[allow(unreachable_patterns)]
491 _ => {
492 let build_func = build_streaming_executor
493 .expect("invalid build. Missing feature new-streaming");
494 return build_func(root, lp_arena, expr_arena);
495 },
496 #[allow(unreachable_patterns)]
497 _ => unreachable!(),
498 }
499 },
500
501 Select {
502 expr,
503 input,
504 schema: _schema,
505 options,
506 ..
507 } => {
508 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
509 let input = recurse!(input, state)?;
510 let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
511 let phys_expr = create_physical_expressions_from_irs(
512 &expr,
513 Context::Default,
514 expr_arena,
515 &input_schema,
516 &mut state,
517 )?;
518
519 let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
520 && !phys_expr.iter().all(|p| {
522 p.is_literal()
523 });
524
525 Ok(Box::new(executors::ProjectionExec {
526 input,
527 expr: phys_expr,
528 has_windows: state.has_windows,
529 input_schema,
530 #[cfg(test)]
531 schema: _schema,
532 options,
533 allow_vertical_parallelism,
534 }))
535 },
536 DataFrameScan {
537 df, output_schema, ..
538 } => Ok(Box::new(executors::DataFrameExec {
539 df,
540 projection: output_schema.map(|s| s.iter_names_cloned().collect()),
541 })),
542 Sort {
543 input,
544 by_column,
545 slice,
546 sort_options,
547 } => {
548 let input_schema = lp_arena.get(input).schema(lp_arena);
549 let by_column = create_physical_expressions_from_irs(
550 &by_column,
551 Context::Default,
552 expr_arena,
553 input_schema.as_ref(),
554 &mut ExpressionConversionState::new(true),
555 )?;
556 let input = recurse!(input, state)?;
557 Ok(Box::new(executors::SortExec {
558 input,
559 by_column,
560 slice,
561 sort_options,
562 }))
563 },
564 Cache {
565 input,
566 id,
567 cache_hits,
568 } => {
569 state.has_cache_parent = true;
570 state.has_cache_child = true;
571
572 if !cache_nodes.contains_key(&id) {
573 let input = recurse!(input, state)?;
574
575 let cache = Box::new(executors::CacheExec {
576 id,
577 input: Some(input),
578 count: cache_hits,
579 });
580
581 cache_nodes.insert(id, cache);
582 }
583
584 Ok(Box::new(executors::CacheExec {
585 id,
586 input: None,
587 count: cache_hits,
588 }))
589 },
590 Distinct { input, options } => {
591 let input = recurse!(input, state)?;
592 Ok(Box::new(executors::UniqueExec { input, options }))
593 },
594 GroupBy {
595 input,
596 keys,
597 aggs,
598 apply,
599 schema,
600 maintain_order,
601 options,
602 } => {
603 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
604 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
605 let phys_keys = create_physical_expressions_from_irs(
606 &keys,
607 Context::Default,
608 expr_arena,
609 &input_schema,
610 &mut ExpressionConversionState::new(true),
611 )?;
612 let phys_aggs = create_physical_expressions_from_irs(
613 &aggs,
614 Context::Aggregation,
615 expr_arena,
616 &input_schema,
617 &mut ExpressionConversionState::new(true),
618 )?;
619
620 let _slice = options.slice;
621 #[cfg(feature = "dynamic_group_by")]
622 if let Some(options) = options.dynamic {
623 let input = recurse!(input, state)?;
624 return Ok(Box::new(executors::GroupByDynamicExec {
625 input,
626 keys: phys_keys,
627 aggs: phys_aggs,
628 options,
629 input_schema,
630 slice: _slice,
631 apply,
632 }));
633 }
634
635 #[cfg(feature = "dynamic_group_by")]
636 if let Some(options) = options.rolling {
637 let input = recurse!(input, state)?;
638 return Ok(Box::new(executors::GroupByRollingExec {
639 input,
640 keys: phys_keys,
641 aggs: phys_aggs,
642 options,
643 input_schema,
644 slice: _slice,
645 apply,
646 }));
647 }
648
649 let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
651 if partitionable {
652 let from_partitioned_ds = (&*lp_arena).iter(input).any(|(_, lp)| {
653 if let Union { options, .. } = lp {
654 options.from_partitioned_ds
655 } else {
656 false
657 }
658 });
659 let input = recurse!(input, state)?;
660 let keys = keys
661 .iter()
662 .map(|e| e.to_expr(expr_arena))
663 .collect::<Vec<_>>();
664 let aggs = aggs
665 .iter()
666 .map(|e| e.to_expr(expr_arena))
667 .collect::<Vec<_>>();
668 Ok(Box::new(executors::PartitionGroupByExec::new(
669 input,
670 phys_keys,
671 phys_aggs,
672 maintain_order,
673 options.slice,
674 input_schema,
675 schema,
676 from_partitioned_ds,
677 keys,
678 aggs,
679 )))
680 } else {
681 let input = recurse!(input, state)?;
682 Ok(Box::new(executors::GroupByExec::new(
683 input,
684 phys_keys,
685 phys_aggs,
686 apply,
687 maintain_order,
688 input_schema,
689 options.slice,
690 )))
691 }
692 },
693 Join {
694 input_left,
695 input_right,
696 left_on,
697 right_on,
698 options,
699 schema,
700 ..
701 } => {
702 let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
703 let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
704
705 let (input_left, input_right) = state.with_new_branch(|new_state| {
706 (
707 recurse!(input_left, new_state),
708 recurse!(input_right, new_state),
709 )
710 });
711 let input_left = input_left?;
712 let input_right = input_right?;
713
714 let parallel = if options.force_parallel {
716 true
717 } else {
718 options.allow_parallel
719 };
720
721 let left_on = create_physical_expressions_from_irs(
722 &left_on,
723 Context::Default,
724 expr_arena,
725 &schema_left,
726 &mut ExpressionConversionState::new(true),
727 )?;
728 let right_on = create_physical_expressions_from_irs(
729 &right_on,
730 Context::Default,
731 expr_arena,
732 &schema_right,
733 &mut ExpressionConversionState::new(true),
734 )?;
735 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
736
737 let join_type_options = options
740 .options
741 .map(|o| {
742 o.compile(|e| {
743 let phys_expr = create_physical_expr(
744 e,
745 Context::Default,
746 expr_arena,
747 &schema,
748 &mut ExpressionConversionState::new(false),
749 )?;
750
751 let execution_state = ExecutionState::default();
752
753 Ok(Arc::new(move |df: DataFrame| {
754 let mask = phys_expr.evaluate(&df, &execution_state)?;
755 let mask = mask.as_materialized_series();
756 let mask = mask.bool()?;
757 df._filter_seq(mask)
758 }))
759 })
760 })
761 .transpose()?;
762
763 Ok(Box::new(executors::JoinExec::new(
764 input_left,
765 input_right,
766 left_on,
767 right_on,
768 parallel,
769 options.args,
770 join_type_options,
771 )))
772 },
773 HStack {
774 input,
775 exprs,
776 schema: output_schema,
777 options,
778 } => {
779 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
780 let input = recurse!(input, state)?;
781
782 let allow_vertical_parallelism = options.should_broadcast
783 && exprs
784 .iter()
785 .all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));
786
787 let mut state =
788 ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
789
790 let phys_exprs = create_physical_expressions_from_irs(
791 &exprs,
792 Context::Default,
793 expr_arena,
794 &input_schema,
795 &mut state,
796 )?;
797 Ok(Box::new(executors::StackExec {
798 input,
799 has_windows: state.has_windows,
800 exprs: phys_exprs,
801 input_schema,
802 output_schema,
803 options,
804 allow_vertical_parallelism,
805 }))
806 },
807 MapFunction {
808 input, function, ..
809 } => {
810 let input = recurse!(input, state)?;
811 Ok(Box::new(executors::UdfExec { input, function }))
812 },
813 ExtContext {
814 input, contexts, ..
815 } => {
816 let input = recurse!(input, state)?;
817 let contexts = contexts
818 .into_iter()
819 .map(|node| recurse!(node, state))
820 .collect::<PolarsResult<_>>()?;
821 Ok(Box::new(executors::ExternalContext { input, contexts }))
822 },
823 SimpleProjection { input, columns } => {
824 let input = recurse!(input, state)?;
825 let exec = executors::ProjectionSimple { input, columns };
826 Ok(Box::new(exec))
827 },
828 #[cfg(feature = "merge_sorted")]
829 MergeSorted {
830 input_left,
831 input_right,
832 key,
833 } => {
834 let (input_left, input_right) = state.with_new_branch(|new_state| {
835 (
836 recurse!(input_left, new_state),
837 recurse!(input_right, new_state),
838 )
839 });
840 let input_left = input_left?;
841 let input_right = input_right?;
842
843 let exec = executors::MergeSorted {
844 input_left,
845 input_right,
846 key,
847 };
848 Ok(Box::new(exec))
849 },
850 Invalid => unreachable!(),
851 }
852}
853
854pub fn create_scan_predicate(
855 predicate: &ExprIR,
856 expr_arena: &mut Arena<AExpr>,
857 schema: &Arc<Schema>,
858 state: &mut ExpressionConversionState,
859 create_skip_batch_predicate: bool,
860 create_column_predicates: bool,
861) -> PolarsResult<ScanPredicate> {
862 let phys_predicate =
863 create_physical_expr(predicate, Context::Default, expr_arena, schema, state)?;
864 let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
865 predicate.node(),
866 expr_arena,
867 )));
868
869 let mut skip_batch_predicate = None;
870
871 if create_skip_batch_predicate {
872 if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
873 let expr = ExprIR::new(node, predicate.output_name_inner().clone());
874
875 if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
876 eprintln!("predicate: {}", predicate.display(expr_arena));
877 eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
878 }
879
880 let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
881
882 skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
883 for (col, dtype) in schema.iter() {
884 if !live_columns.contains(col) {
885 continue;
886 }
887
888 skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
889 skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
890 skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
891 }
892
893 skip_batch_predicate = Some(create_physical_expr(
894 &expr,
895 Context::Default,
896 expr_arena,
897 &Arc::new(skip_batch_schema),
898 state,
899 )?);
900 }
901 }
902
903 let column_predicates = if create_column_predicates {
904 let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
905 if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
906 eprintln!("column_predicates: {{");
907 eprintln!(" [");
908 for (pred, spec) in column_predicates.predicates.values() {
909 eprintln!(
910 " {} ({spec:?}),",
911 ExprIRDisplay::display_node(*pred, expr_arena)
912 );
913 }
914 eprintln!(" ],");
915 eprintln!(
916 " is_sumwise_complete: {}",
917 column_predicates.is_sumwise_complete
918 );
919 eprintln!("}}");
920 }
921 PhysicalColumnPredicates {
922 predicates: column_predicates
923 .predicates
924 .into_iter()
925 .map(|(n, (p, s))| {
926 PolarsResult::Ok((
927 n,
928 (
929 create_physical_expr(
930 &ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
931 Context::Default,
932 expr_arena,
933 schema,
934 state,
935 )?,
936 s,
937 ),
938 ))
939 })
940 .collect::<PolarsResult<PlHashMap<_, _>>>()?,
941 is_sumwise_complete: column_predicates.is_sumwise_complete,
942 }
943 } else {
944 PhysicalColumnPredicates {
945 predicates: PlHashMap::default(),
946 is_sumwise_complete: false,
947 }
948 };
949
950 PolarsResult::Ok(ScanPredicate {
951 predicate: phys_predicate,
952 live_columns,
953 skip_batch_predicate,
954 column_predicates,
955 })
956}