1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::global::_set_n_rows_for_scan;
5use polars_plan::plans::expr_ir::ExprIR;
6use polars_utils::format_pl_smallstr;
7use recursive::recursive;
8
9use self::expr_ir::OutputName;
10use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11#[cfg(feature = "python")]
12use self::python_dsl::PythonScanSource;
13use super::super::executors::{self, Executor};
14use super::*;
15use crate::ScanPredicate;
16use crate::executors::{CachePrefiller, SinkExecutor};
17use crate::predicate::PhysicalColumnPredicates;
18
19pub type StreamingExecutorBuilder =
20 fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
21
22fn partitionable_gb(
23 keys: &[ExprIR],
24 aggs: &[ExprIR],
25 input_schema: &Schema,
26 expr_arena: &Arena<AExpr>,
27 apply: &Option<Arc<dyn DataFrameUdf>>,
28) -> bool {
29 if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
36 for key in keys {
39 if (expr_arena).iter(key.node()).count() > 1
40 || has_aexpr(key.node(), expr_arena, |ae| match ae {
41 AExpr::Literal(lv) => !lv.is_scalar(),
42 _ => false,
43 })
44 {
45 return false;
46 }
47 }
48
49 can_pre_agg_exprs(aggs, expr_arena, input_schema)
50 } else {
51 false
52 }
53}
54
55#[derive(Clone)]
56struct ConversionState {
57 has_cache_child: bool,
58 has_cache_parent: bool,
59}
60
61impl ConversionState {
62 fn new() -> PolarsResult<Self> {
63 Ok(ConversionState {
64 has_cache_child: false,
65 has_cache_parent: false,
66 })
67 }
68
69 fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
70 let mut new_state = self.clone();
71 new_state.has_cache_child = false;
72 let out = func(&mut new_state);
73 self.has_cache_child = new_state.has_cache_child;
74 out
75 }
76}
77
78pub fn create_physical_plan(
79 root: Node,
80 lp_arena: &mut Arena<IR>,
81 expr_arena: &mut Arena<AExpr>,
82 build_streaming_executor: Option<StreamingExecutorBuilder>,
83) -> PolarsResult<Box<dyn Executor>> {
84 let mut state = ConversionState::new()?;
85 let mut cache_nodes = Default::default();
86 let plan = create_physical_plan_impl(
87 root,
88 lp_arena,
89 expr_arena,
90 &mut state,
91 &mut cache_nodes,
92 build_streaming_executor,
93 )?;
94
95 if cache_nodes.is_empty() {
96 Ok(plan)
97 } else {
98 Ok(Box::new(CachePrefiller {
99 caches: cache_nodes,
100 phys_plan: plan,
101 }))
102 }
103}
104
105pub struct MultiplePhysicalPlans {
106 pub cache_prefiller: Option<Box<dyn Executor>>,
107 pub physical_plans: Vec<Box<dyn Executor>>,
108}
109pub fn create_multiple_physical_plans(
110 roots: &[Node],
111 lp_arena: &mut Arena<IR>,
112 expr_arena: &mut Arena<AExpr>,
113 build_streaming_executor: Option<StreamingExecutorBuilder>,
114) -> PolarsResult<MultiplePhysicalPlans> {
115 let mut state = ConversionState::new()?;
116 let mut cache_nodes = Default::default();
117 let plans = state.with_new_branch(|new_state| {
118 roots
119 .iter()
120 .map(|&node| {
121 create_physical_plan_impl(
122 node,
123 lp_arena,
124 expr_arena,
125 new_state,
126 &mut cache_nodes,
127 build_streaming_executor,
128 )
129 })
130 .collect::<PolarsResult<Vec<_>>>()
131 })?;
132
133 let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
134 struct Empty;
135 impl Executor for Empty {
136 fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
137 Ok(DataFrame::empty())
138 }
139 }
140 Box::new(CachePrefiller {
141 caches: cache_nodes,
142 phys_plan: Box::new(Empty),
143 }) as _
144 });
145
146 Ok(MultiplePhysicalPlans {
147 cache_prefiller,
148 physical_plans: plans,
149 })
150}
151
152#[cfg(feature = "python")]
153#[allow(clippy::type_complexity)]
154pub fn python_scan_predicate(
155 options: &mut PythonOptions,
156 expr_arena: &Arena<AExpr>,
157 state: &mut ExpressionConversionState,
158) -> PolarsResult<(
159 Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
160 Option<Vec<u8>>,
161)> {
162 let mut predicate_serialized = None;
163 let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
164 if matches!(options.python_source, PythonScanSource::Pyarrow) {
166 if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
167 e.node(),
168 expr_arena,
169 Default::default(),
170 ) {
171 options.predicate = PythonPredicate::PyArrow(eval_str);
172 None
174 } else {
175 Some(create_physical_expr(
176 e,
177 Context::Default,
178 expr_arena,
179 &options.schema,
180 state,
181 )?)
182 }
183 }
184 else {
186 let dsl_expr = e.to_expr(expr_arena);
187 predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
188
189 Some(create_physical_expr(
190 e,
191 Context::Default,
192 expr_arena,
193 &options.schema,
194 state,
195 )?)
196 }
197 } else {
198 None
199 };
200
201 Ok((predicate, predicate_serialized))
202}
203
204#[recursive]
205fn create_physical_plan_impl(
206 root: Node,
207 lp_arena: &mut Arena<IR>,
208 expr_arena: &mut Arena<AExpr>,
209 state: &mut ConversionState,
210 cache_nodes: &mut PlIndexMap<usize, Box<executors::CacheExec>>,
212 build_streaming_executor: Option<StreamingExecutorBuilder>,
213) -> PolarsResult<Box<dyn Executor>> {
214 use IR::*;
215
216 macro_rules! recurse {
217 ($node:expr, $state: expr) => {
218 create_physical_plan_impl(
219 $node,
220 lp_arena,
221 expr_arena,
222 $state,
223 cache_nodes,
224 build_streaming_executor,
225 )
226 };
227 }
228
229 let logical_plan = if state.has_cache_parent || matches!(lp_arena.get(root), IR::Scan { .. }) {
230 lp_arena.get(root).clone()
231 } else {
232 lp_arena.take(root)
233 };
234
235 match logical_plan {
236 #[cfg(feature = "python")]
237 PythonScan { mut options } => {
238 let mut expr_conv_state = ExpressionConversionState::new(true);
239 let (predicate, predicate_serialized) =
240 python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
241 Ok(Box::new(executors::PythonScanExec {
242 options,
243 predicate,
244 predicate_serialized,
245 }))
246 },
247 Sink { input, payload } => {
248 let input = recurse!(input, state)?;
249 match payload {
250 SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
251 input,
252 name: "mem".to_string(),
253 f: Box::new(move |df, _state| Ok(Some(df))),
254 })),
255 SinkTypeIR::File(FileSinkType {
256 file_type,
257 target,
258 sink_options,
259 cloud_options,
260 }) => {
261 let name: &'static str = match &file_type {
262 #[cfg(feature = "parquet")]
263 FileType::Parquet(_) => "parquet",
264 #[cfg(feature = "ipc")]
265 FileType::Ipc(_) => "ipc",
266 #[cfg(feature = "csv")]
267 FileType::Csv(_) => "csv",
268 #[cfg(feature = "json")]
269 FileType::Json(_) => "json",
270 #[allow(unreachable_patterns)]
271 _ => panic!("enable filetype feature"),
272 };
273
274 Ok(Box::new(SinkExecutor {
275 input,
276 name: name.to_string(),
277 f: Box::new(move |mut df, _state| {
278 let mut file = target
279 .open_into_writeable(&sink_options, cloud_options.as_ref())?;
280 let writer = &mut *file;
281
282 use std::io::BufWriter;
283 match &file_type {
284 #[cfg(feature = "parquet")]
285 FileType::Parquet(options) => {
286 use polars_io::parquet::write::ParquetWriter;
287 ParquetWriter::new(BufWriter::new(writer))
288 .with_compression(options.compression)
289 .with_statistics(options.statistics)
290 .with_row_group_size(options.row_group_size)
291 .with_data_page_size(options.data_page_size)
292 .with_key_value_metadata(options.key_value_metadata.clone())
293 .finish(&mut df)?;
294 },
295 #[cfg(feature = "ipc")]
296 FileType::Ipc(options) => {
297 use polars_io::SerWriter;
298 use polars_io::ipc::IpcWriter;
299 IpcWriter::new(BufWriter::new(writer))
300 .with_compression(options.compression)
301 .with_compat_level(options.compat_level)
302 .finish(&mut df)?;
303 },
304 #[cfg(feature = "csv")]
305 FileType::Csv(options) => {
306 use polars_io::SerWriter;
307 use polars_io::csv::write::CsvWriter;
308 CsvWriter::new(BufWriter::new(writer))
309 .include_bom(options.include_bom)
310 .include_header(options.include_header)
311 .with_separator(options.serialize_options.separator)
312 .with_line_terminator(
313 options.serialize_options.line_terminator.clone(),
314 )
315 .with_quote_char(options.serialize_options.quote_char)
316 .with_batch_size(options.batch_size)
317 .with_datetime_format(
318 options.serialize_options.datetime_format.clone(),
319 )
320 .with_date_format(
321 options.serialize_options.date_format.clone(),
322 )
323 .with_time_format(
324 options.serialize_options.time_format.clone(),
325 )
326 .with_float_scientific(
327 options.serialize_options.float_scientific,
328 )
329 .with_float_precision(
330 options.serialize_options.float_precision,
331 )
332 .with_null_value(options.serialize_options.null.clone())
333 .with_quote_style(options.serialize_options.quote_style)
334 .finish(&mut df)?;
335 },
336 #[cfg(feature = "json")]
337 FileType::Json(_options) => {
338 use polars_io::SerWriter;
339 use polars_io::json::{JsonFormat, JsonWriter};
340
341 JsonWriter::new(BufWriter::new(writer))
342 .with_json_format(JsonFormat::JsonLines)
343 .finish(&mut df)?;
344 },
345 #[allow(unreachable_patterns)]
346 _ => panic!("enable filetype feature"),
347 }
348
349 file.sync_on_close(sink_options.sync_on_close)?;
350 file.close()?;
351
352 Ok(None)
353 }),
354 }))
355 },
356
357 SinkTypeIR::Partition { .. } => {
358 polars_bail!(InvalidOperation:
359 "partition sinks not yet supported in standard engine."
360 )
361 },
362 }
363 },
364 SinkMultiple { .. } => {
365 unreachable!("should be handled with create_multiple_physical_plans")
366 },
367 Union { inputs, options } => {
368 let inputs = state.with_new_branch(|new_state| {
369 inputs
370 .into_iter()
371 .map(|node| recurse!(node, new_state))
372 .collect::<PolarsResult<Vec<_>>>()
373 });
374 let inputs = inputs?;
375 Ok(Box::new(executors::UnionExec { inputs, options }))
376 },
377 HConcat {
378 inputs, options, ..
379 } => {
380 let inputs = state.with_new_branch(|new_state| {
381 inputs
382 .into_iter()
383 .map(|node| recurse!(node, new_state))
384 .collect::<PolarsResult<Vec<_>>>()
385 });
386
387 let inputs = inputs?;
388
389 Ok(Box::new(executors::HConcatExec { inputs, options }))
390 },
391 Slice { input, offset, len } => {
392 let input = recurse!(input, state)?;
393 Ok(Box::new(executors::SliceExec { input, offset, len }))
394 },
395 Filter { input, predicate } => {
396 let mut streamable =
397 is_elementwise_rec_no_cat_cast(expr_arena.get(predicate.node()), expr_arena);
398 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
399 if streamable {
400 streamable = !input_schema
402 .iter_values()
403 .any(|dt| dt.contains_categoricals())
404 || {
405 #[cfg(feature = "dtype-categorical")]
406 {
407 polars_core::using_string_cache()
408 }
409
410 #[cfg(not(feature = "dtype-categorical"))]
411 {
412 false
413 }
414 }
415 }
416 let input = recurse!(input, state)?;
417 let mut state = ExpressionConversionState::new(true);
418 let predicate = create_physical_expr(
419 &predicate,
420 Context::Default,
421 expr_arena,
422 &input_schema,
423 &mut state,
424 )?;
425 Ok(Box::new(executors::FilterExec::new(
426 predicate,
427 input,
428 state.has_windows,
429 streamable,
430 )))
431 },
432 #[allow(unused_variables)]
433 Scan {
434 sources,
435 file_info,
436 hive_parts,
437 output_schema,
438 scan_type,
439 predicate,
440 mut unified_scan_args,
441 id: scan_mem_id,
442 } => {
443 unified_scan_args.pre_slice = if let Some(mut slice) = unified_scan_args.pre_slice {
444 *slice.len_mut() = _set_n_rows_for_scan(Some(slice.len())).unwrap();
445 Some(slice)
446 } else {
447 _set_n_rows_for_scan(None)
448 .map(|len| polars_utils::slice_enum::Slice::Positive { offset: 0, len })
449 };
450
451 let mut expr_conversion_state = ExpressionConversionState::new(true);
452
453 let mut create_skip_batch_predicate = false;
454 #[cfg(feature = "parquet")]
455 {
456 create_skip_batch_predicate |= matches!(
457 &*scan_type,
458 FileScan::Parquet {
459 options: polars_io::prelude::ParquetOptions {
460 use_statistics: true,
461 ..
462 },
463 ..
464 }
465 );
466 }
467
468 let predicate = predicate
469 .map(|predicate| {
470 create_scan_predicate(
471 &predicate,
472 expr_arena,
473 output_schema.as_ref().unwrap_or(&file_info.schema),
474 &mut expr_conversion_state,
475 create_skip_batch_predicate,
476 false,
477 )
478 })
479 .transpose()?;
480
481 match *scan_type {
482 FileScan::Anonymous { function, .. } => {
483 Ok(Box::new(executors::AnonymousScanExec {
484 function,
485 predicate,
486 unified_scan_args,
487 file_info,
488 output_schema,
489 predicate_has_windows: expr_conversion_state.has_windows,
490 }))
491 },
492 #[allow(unreachable_patterns)]
493 _ => {
494 state.has_cache_parent = true;
502 state.has_cache_child = true;
503
504 let scan_mem_id: usize = scan_mem_id.to_usize();
505
506 if !cache_nodes.contains_key(&scan_mem_id) {
507 let build_func = build_streaming_executor
508 .expect("invalid build. Missing feature new-streaming");
509
510 let executor = build_func(root, lp_arena, expr_arena)?;
511
512 cache_nodes.insert(
513 scan_mem_id,
514 Box::new(executors::CacheExec {
515 input: Some(executor),
516 id: scan_mem_id,
517 count: 0,
519 is_new_streaming_scan: true,
520 }),
521 );
522 } else {
523 let cache_exec = cache_nodes.get_mut(&scan_mem_id).unwrap();
526 cache_exec.count = cache_exec.count.saturating_add(1);
527 }
528
529 Ok(Box::new(executors::CacheExec {
530 id: scan_mem_id,
531 input: None,
534 count: Default::default(),
535 is_new_streaming_scan: true,
536 }))
537 },
538 #[allow(unreachable_patterns)]
539 _ => unreachable!(),
540 }
541 },
542
543 Select {
544 expr,
545 input,
546 schema: _schema,
547 options,
548 ..
549 } => {
550 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
551 let input = recurse!(input, state)?;
552 let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
553 let phys_expr = create_physical_expressions_from_irs(
554 &expr,
555 Context::Default,
556 expr_arena,
557 &input_schema,
558 &mut state,
559 )?;
560
561 let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena))
562 && !phys_expr.iter().all(|p| {
564 p.is_literal()
565 });
566
567 Ok(Box::new(executors::ProjectionExec {
568 input,
569 expr: phys_expr,
570 has_windows: state.has_windows,
571 input_schema,
572 #[cfg(test)]
573 schema: _schema,
574 options,
575 allow_vertical_parallelism,
576 }))
577 },
578 DataFrameScan {
579 df, output_schema, ..
580 } => Ok(Box::new(executors::DataFrameExec {
581 df,
582 projection: output_schema.map(|s| s.iter_names_cloned().collect()),
583 })),
584 Sort {
585 input,
586 by_column,
587 slice,
588 sort_options,
589 } => {
590 let input_schema = lp_arena.get(input).schema(lp_arena);
591 let by_column = create_physical_expressions_from_irs(
592 &by_column,
593 Context::Default,
594 expr_arena,
595 input_schema.as_ref(),
596 &mut ExpressionConversionState::new(true),
597 )?;
598 let input = recurse!(input, state)?;
599 Ok(Box::new(executors::SortExec {
600 input,
601 by_column,
602 slice,
603 sort_options,
604 }))
605 },
606 Cache {
607 input,
608 id,
609 cache_hits,
610 } => {
611 state.has_cache_parent = true;
612 state.has_cache_child = true;
613
614 if !cache_nodes.contains_key(&id) {
615 let input = recurse!(input, state)?;
616
617 let cache = Box::new(executors::CacheExec {
618 id,
619 input: Some(input),
620 count: cache_hits,
621 is_new_streaming_scan: false,
622 });
623
624 cache_nodes.insert(id, cache);
625 }
626
627 Ok(Box::new(executors::CacheExec {
628 id,
629 input: None,
630 count: cache_hits,
631 is_new_streaming_scan: false,
632 }))
633 },
634 Distinct { input, options } => {
635 let input = recurse!(input, state)?;
636 Ok(Box::new(executors::UniqueExec { input, options }))
637 },
638 GroupBy {
639 input,
640 keys,
641 aggs,
642 apply,
643 schema,
644 maintain_order,
645 options,
646 } => {
647 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
648 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
649 let phys_keys = create_physical_expressions_from_irs(
650 &keys,
651 Context::Default,
652 expr_arena,
653 &input_schema,
654 &mut ExpressionConversionState::new(true),
655 )?;
656 let phys_aggs = create_physical_expressions_from_irs(
657 &aggs,
658 Context::Aggregation,
659 expr_arena,
660 &input_schema,
661 &mut ExpressionConversionState::new(true),
662 )?;
663
664 let _slice = options.slice;
665 #[cfg(feature = "dynamic_group_by")]
666 if let Some(options) = options.dynamic {
667 let input = recurse!(input, state)?;
668 return Ok(Box::new(executors::GroupByDynamicExec {
669 input,
670 keys: phys_keys,
671 aggs: phys_aggs,
672 options,
673 input_schema,
674 slice: _slice,
675 apply,
676 }));
677 }
678
679 #[cfg(feature = "dynamic_group_by")]
680 if let Some(options) = options.rolling {
681 let input = recurse!(input, state)?;
682 return Ok(Box::new(executors::GroupByRollingExec {
683 input,
684 keys: phys_keys,
685 aggs: phys_aggs,
686 options,
687 input_schema,
688 slice: _slice,
689 apply,
690 }));
691 }
692
693 let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
695 if partitionable {
696 let from_partitioned_ds = (&*lp_arena).iter(input).any(|(_, lp)| {
697 if let Union { options, .. } = lp {
698 options.from_partitioned_ds
699 } else {
700 false
701 }
702 });
703 let input = recurse!(input, state)?;
704 let keys = keys
705 .iter()
706 .map(|e| e.to_expr(expr_arena))
707 .collect::<Vec<_>>();
708 let aggs = aggs
709 .iter()
710 .map(|e| e.to_expr(expr_arena))
711 .collect::<Vec<_>>();
712 Ok(Box::new(executors::PartitionGroupByExec::new(
713 input,
714 phys_keys,
715 phys_aggs,
716 maintain_order,
717 options.slice,
718 input_schema,
719 schema,
720 from_partitioned_ds,
721 keys,
722 aggs,
723 )))
724 } else {
725 let input = recurse!(input, state)?;
726 Ok(Box::new(executors::GroupByExec::new(
727 input,
728 phys_keys,
729 phys_aggs,
730 apply,
731 maintain_order,
732 input_schema,
733 options.slice,
734 )))
735 }
736 },
737 Join {
738 input_left,
739 input_right,
740 left_on,
741 right_on,
742 options,
743 schema,
744 ..
745 } => {
746 let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
747 let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
748
749 let (input_left, input_right) = state.with_new_branch(|new_state| {
750 (
751 recurse!(input_left, new_state),
752 recurse!(input_right, new_state),
753 )
754 });
755 let input_left = input_left?;
756 let input_right = input_right?;
757
758 let parallel = if options.force_parallel {
760 true
761 } else {
762 options.allow_parallel
763 };
764
765 let left_on = create_physical_expressions_from_irs(
766 &left_on,
767 Context::Default,
768 expr_arena,
769 &schema_left,
770 &mut ExpressionConversionState::new(true),
771 )?;
772 let right_on = create_physical_expressions_from_irs(
773 &right_on,
774 Context::Default,
775 expr_arena,
776 &schema_right,
777 &mut ExpressionConversionState::new(true),
778 )?;
779 let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
780
781 let join_type_options = options
784 .options
785 .map(|o| {
786 o.compile(|e| {
787 let phys_expr = create_physical_expr(
788 e,
789 Context::Default,
790 expr_arena,
791 &schema,
792 &mut ExpressionConversionState::new(false),
793 )?;
794
795 let execution_state = ExecutionState::default();
796
797 Ok(Arc::new(move |df: DataFrame| {
798 let mask = phys_expr.evaluate(&df, &execution_state)?;
799 let mask = mask.as_materialized_series();
800 let mask = mask.bool()?;
801 df._filter_seq(mask)
802 }))
803 })
804 })
805 .transpose()?;
806
807 Ok(Box::new(executors::JoinExec::new(
808 input_left,
809 input_right,
810 left_on,
811 right_on,
812 parallel,
813 options.args,
814 join_type_options,
815 )))
816 },
817 HStack {
818 input,
819 exprs,
820 schema: output_schema,
821 options,
822 } => {
823 let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
824 let input = recurse!(input, state)?;
825
826 let allow_vertical_parallelism = options.should_broadcast
827 && exprs
828 .iter()
829 .all(|e| is_elementwise_rec_no_cat_cast(expr_arena.get(e.node()), expr_arena));
830
831 let mut state =
832 ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
833
834 let phys_exprs = create_physical_expressions_from_irs(
835 &exprs,
836 Context::Default,
837 expr_arena,
838 &input_schema,
839 &mut state,
840 )?;
841 Ok(Box::new(executors::StackExec {
842 input,
843 has_windows: state.has_windows,
844 exprs: phys_exprs,
845 input_schema,
846 output_schema,
847 options,
848 allow_vertical_parallelism,
849 }))
850 },
851 MapFunction {
852 input, function, ..
853 } => {
854 let input = recurse!(input, state)?;
855 Ok(Box::new(executors::UdfExec { input, function }))
856 },
857 ExtContext {
858 input, contexts, ..
859 } => {
860 let input = recurse!(input, state)?;
861 let contexts = contexts
862 .into_iter()
863 .map(|node| recurse!(node, state))
864 .collect::<PolarsResult<_>>()?;
865 Ok(Box::new(executors::ExternalContext { input, contexts }))
866 },
867 SimpleProjection { input, columns } => {
868 let input = recurse!(input, state)?;
869 let exec = executors::ProjectionSimple { input, columns };
870 Ok(Box::new(exec))
871 },
872 #[cfg(feature = "merge_sorted")]
873 MergeSorted {
874 input_left,
875 input_right,
876 key,
877 } => {
878 let (input_left, input_right) = state.with_new_branch(|new_state| {
879 (
880 recurse!(input_left, new_state),
881 recurse!(input_right, new_state),
882 )
883 });
884 let input_left = input_left?;
885 let input_right = input_right?;
886
887 let exec = executors::MergeSorted {
888 input_left,
889 input_right,
890 key,
891 };
892 Ok(Box::new(exec))
893 },
894 Invalid => unreachable!(),
895 }
896}
897
898pub fn create_scan_predicate(
899 predicate: &ExprIR,
900 expr_arena: &mut Arena<AExpr>,
901 schema: &Arc<Schema>,
902 state: &mut ExpressionConversionState,
903 create_skip_batch_predicate: bool,
904 create_column_predicates: bool,
905) -> PolarsResult<ScanPredicate> {
906 let phys_predicate =
907 create_physical_expr(predicate, Context::Default, expr_arena, schema, state)?;
908 let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
909 predicate.node(),
910 expr_arena,
911 )));
912
913 let mut skip_batch_predicate = None;
914
915 if create_skip_batch_predicate {
916 if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
917 let expr = ExprIR::new(node, predicate.output_name_inner().clone());
918
919 if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
920 eprintln!("predicate: {}", predicate.display(expr_arena));
921 eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
922 }
923
924 let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
925
926 skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
927 for (col, dtype) in schema.iter() {
928 if !live_columns.contains(col) {
929 continue;
930 }
931
932 skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
933 skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
934 skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
935 }
936
937 skip_batch_predicate = Some(create_physical_expr(
938 &expr,
939 Context::Default,
940 expr_arena,
941 &Arc::new(skip_batch_schema),
942 state,
943 )?);
944 }
945 }
946
947 let column_predicates = if create_column_predicates {
948 let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
949 if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
950 eprintln!("column_predicates: {{");
951 eprintln!(" [");
952 for (pred, spec) in column_predicates.predicates.values() {
953 eprintln!(
954 " {} ({spec:?}),",
955 ExprIRDisplay::display_node(*pred, expr_arena)
956 );
957 }
958 eprintln!(" ],");
959 eprintln!(
960 " is_sumwise_complete: {}",
961 column_predicates.is_sumwise_complete
962 );
963 eprintln!("}}");
964 }
965 PhysicalColumnPredicates {
966 predicates: column_predicates
967 .predicates
968 .into_iter()
969 .map(|(n, (p, s))| {
970 PolarsResult::Ok((
971 n,
972 (
973 create_physical_expr(
974 &ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
975 Context::Default,
976 expr_arena,
977 schema,
978 state,
979 )?,
980 s,
981 ),
982 ))
983 })
984 .collect::<PolarsResult<PlHashMap<_, _>>>()?,
985 is_sumwise_complete: column_predicates.is_sumwise_complete,
986 }
987 } else {
988 PhysicalColumnPredicates {
989 predicates: PlHashMap::default(),
990 is_sumwise_complete: false,
991 }
992 };
993
994 PolarsResult::Ok(ScanPredicate {
995 predicate: phys_predicate,
996 live_columns,
997 skip_batch_predicate,
998 column_predicates,
999 })
1000}