1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use polars_core::frame::DataFrame;
5use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};
6use polars_core::schema::{Schema, SchemaRef};
7use polars_error::PolarsResult;
8use polars_io::RowIndex;
9use polars_io::cloud::CloudOptions;
10use polars_ops::frame::JoinArgs;
11use polars_plan::dsl::deletion::DeletionFilesList;
12use polars_plan::dsl::{
13 CastColumnsPolicy, FileSinkOptions, JoinTypeOptionsIR, MissingColumnsPolicy,
14 PartitionedSinkOptionsIR, PredicateFileSkip, ScanSources, TableStatistics,
15};
16use polars_plan::plans::expr_ir::ExprIR;
17use polars_plan::plans::hive::HivePartitionsDf;
18use polars_plan::plans::{AExpr, DataFrameUdf, IR};
19
20mod fmt;
21mod io;
22mod lower_expr;
23mod lower_group_by;
24mod lower_ir;
25mod to_graph;
26
27pub use fmt::{NodeStyle, visualize_plan};
28use polars_plan::prelude::PlanCallback;
29#[cfg(feature = "dynamic_group_by")]
30use polars_time::DynamicGroupOptions;
31use polars_time::{ClosedWindow, Duration};
32use polars_utils::arena::{Arena, Node};
33use polars_utils::pl_str::PlSmallStr;
34use polars_utils::slice_enum::Slice;
35use slotmap::{SecondaryMap, SlotMap};
36pub use to_graph::physical_plan_to_graph;
37
38pub use self::lower_ir::StreamingLowerIRContext;
39use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
40use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
41use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
42use crate::physical_plan::lower_expr::ExprCache;
43
44slotmap::new_key_type! {
45 pub struct PhysNodeKey;
47}
48
49impl PhysNodeKey {
50 pub fn as_ffi(&self) -> u64 {
51 self.0.as_ffi()
52 }
53}
54
55#[derive(Clone, Debug)]
60pub struct PhysNode {
61 output_schema: Arc<Schema>,
62 kind: PhysNodeKind,
63}
64
65impl PhysNode {
66 pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {
67 Self {
68 output_schema,
69 kind,
70 }
71 }
72
73 pub fn kind(&self) -> &PhysNodeKind {
74 &self.kind
75 }
76}
77
78#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]
82pub struct PhysStream {
83 pub node: PhysNodeKey,
84 pub port: usize,
85}
86
87impl PhysStream {
88 #[allow(unused)]
89 pub fn new(node: PhysNodeKey, port: usize) -> Self {
90 Self { node, port }
91 }
92
93 pub fn first(node: PhysNodeKey) -> Self {
95 Self { node, port: 0 }
96 }
97}
98
99#[derive(Clone, Debug, Copy)]
102#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103#[cfg_attr(
104 feature = "physical_plan_visualization_schema",
105 derive(schemars::JsonSchema)
106)]
107pub enum ZipBehavior {
108 NullExtend,
110 Broadcast,
112 Strict,
114}
115
116#[derive(Clone, Debug)]
117pub enum PhysNodeKind {
118 InMemorySource {
119 df: Arc<DataFrame>,
120 disable_morsel_split: bool,
121 },
122
123 Select {
124 input: PhysStream,
125 selectors: Vec<ExprIR>,
126 extend_original: bool,
127 },
128
129 InputIndependentSelect {
130 selectors: Vec<ExprIR>,
131 },
132
133 WithRowIndex {
134 input: PhysStream,
135 name: PlSmallStr,
136 offset: Option<IdxSize>,
137 },
138
139 Reduce {
140 input: PhysStream,
141 exprs: Vec<ExprIR>,
142 },
143
144 StreamingSlice {
145 input: PhysStream,
146 offset: usize,
147 length: usize,
148 },
149
150 NegativeSlice {
151 input: PhysStream,
152 offset: i64,
153 length: usize,
154 },
155
156 DynamicSlice {
157 input: PhysStream,
158 offset: PhysStream,
159 length: PhysStream,
160 },
161
162 Shift {
163 input: PhysStream,
164 offset: PhysStream,
165 fill: Option<PhysStream>,
166 },
167
168 Filter {
169 input: PhysStream,
170 predicate: ExprIR,
171 },
172
173 SimpleProjection {
174 input: PhysStream,
175 columns: Vec<PlSmallStr>,
176 },
177
178 InMemorySink {
179 input: PhysStream,
180 },
181
182 CallbackSink {
183 input: PhysStream,
184 function: PlanCallback<DataFrame, bool>,
185 maintain_order: bool,
186 chunk_size: Option<NonZeroUsize>,
187 },
188
189 FileSink {
190 input: PhysStream,
191 options: FileSinkOptions,
192 },
193
194 PartitionedSink {
195 input: PhysStream,
196 options: PartitionedSinkOptionsIR,
197 },
198
199 SinkMultiple {
200 sinks: Vec<PhysNodeKey>,
201 },
202
203 InMemoryMap {
207 input: PhysStream,
208 map: Arc<dyn DataFrameUdf>,
209
210 format_str: Option<String>,
212 },
213
214 Map {
215 input: PhysStream,
216 map: Arc<dyn DataFrameUdf>,
217
218 format_str: Option<String>,
220 },
221
222 SortedGroupBy {
223 input: PhysStream,
224 key: PlSmallStr,
225 aggs: Vec<ExprIR>,
226 slice: Option<(IdxSize, IdxSize)>,
227 },
228
229 Sort {
230 input: PhysStream,
231 by_column: Vec<ExprIR>,
232 slice: Option<(i64, usize)>,
233 sort_options: SortMultipleOptions,
234 },
235
236 TopK {
237 input: PhysStream,
238 k: PhysStream,
239 by_column: Vec<ExprIR>,
240 reverse: Vec<bool>,
241 nulls_last: Vec<bool>,
242 },
243
244 Repeat {
245 value: PhysStream,
246 repeats: PhysStream,
247 },
248
249 #[cfg(feature = "cum_agg")]
250 CumAgg {
251 input: PhysStream,
252 kind: crate::nodes::cum_agg::CumAggKind,
253 },
254
255 GatherEvery {
257 input: PhysStream,
258 n: usize,
259 offset: usize,
260 },
261 Rle(PhysStream),
262 RleId(PhysStream),
263 PeakMinMax {
264 input: PhysStream,
265 is_peak_max: bool,
266 },
267
268 OrderedUnion {
269 inputs: Vec<PhysStream>,
270 },
271
272 UnorderedUnion {
273 inputs: Vec<PhysStream>,
274 },
275
276 Zip {
277 inputs: Vec<PhysStream>,
278 zip_behavior: ZipBehavior,
279 },
280
281 #[allow(unused)]
282 Multiplexer {
283 input: PhysStream,
284 },
285
286 MultiScan {
287 scan_sources: ScanSources,
288
289 file_reader_builder: Arc<dyn FileReaderBuilder>,
290 cloud_options: Option<Arc<CloudOptions>>,
291
292 file_projection_builder: ProjectionBuilder,
294 output_schema: SchemaRef,
296
297 row_index: Option<RowIndex>,
298 pre_slice: Option<Slice>,
299 predicate: Option<ExprIR>,
300 predicate_file_skip_applied: Option<PredicateFileSkip>,
301
302 hive_parts: Option<HivePartitionsDf>,
303 include_file_paths: Option<PlSmallStr>,
304 cast_columns_policy: CastColumnsPolicy,
305 missing_columns_policy: MissingColumnsPolicy,
306 forbid_extra_columns: Option<ForbidExtraColumns>,
307
308 deletion_files: Option<DeletionFilesList>,
309 table_statistics: Option<TableStatistics>,
310
311 file_schema: SchemaRef,
313 disable_morsel_split: bool,
314 },
315
316 #[cfg(feature = "python")]
317 PythonScan {
318 options: polars_plan::plans::python::PythonOptions,
319 },
320
321 GroupBy {
322 inputs: Vec<PhysStream>,
323 key_per_input: Vec<Vec<ExprIR>>,
325 aggs_per_input: Vec<Vec<ExprIR>>,
327 },
328
329 #[cfg(feature = "dynamic_group_by")]
330 DynamicGroupBy {
331 input: PhysStream,
332 options: DynamicGroupOptions,
333 aggs: Vec<ExprIR>,
334 slice: Option<(IdxSize, IdxSize)>,
335 },
336
337 #[cfg(feature = "dynamic_group_by")]
338 RollingGroupBy {
339 input: PhysStream,
340 index_column: PlSmallStr,
341 period: Duration,
342 offset: Duration,
343 closed: ClosedWindow,
344 slice: Option<(IdxSize, IdxSize)>,
345 aggs: Vec<ExprIR>,
346 },
347
348 EquiJoin {
349 input_left: PhysStream,
350 input_right: PhysStream,
351 left_on: Vec<ExprIR>,
352 right_on: Vec<ExprIR>,
353 args: JoinArgs,
354 },
355
356 MergeJoin {
357 input_left: PhysStream,
358 input_right: PhysStream,
359 left_on: Vec<PlSmallStr>,
360 right_on: Vec<PlSmallStr>,
361 descending: bool,
362 nulls_last: bool,
363 keys_row_encoded: bool,
364 args: JoinArgs,
365 },
366
367 SemiAntiJoin {
368 input_left: PhysStream,
369 input_right: PhysStream,
370 left_on: Vec<ExprIR>,
371 right_on: Vec<ExprIR>,
372 args: JoinArgs,
373 output_bool: bool,
374 },
375
376 CrossJoin {
377 input_left: PhysStream,
378 input_right: PhysStream,
379 args: JoinArgs,
380 },
381
382 InMemoryJoin {
386 input_left: PhysStream,
387 input_right: PhysStream,
388 left_on: Vec<ExprIR>,
389 right_on: Vec<ExprIR>,
390 args: JoinArgs,
391 options: Option<JoinTypeOptionsIR>,
392 },
393
394 #[cfg(feature = "merge_sorted")]
395 MergeSorted {
396 input_left: PhysStream,
397 input_right: PhysStream,
398 },
399
400 #[cfg(feature = "ewma")]
401 EwmMean {
402 input: PhysStream,
403 options: polars_ops::series::EWMOptions,
404 },
405
406 #[cfg(feature = "ewma")]
407 EwmVar {
408 input: PhysStream,
409 options: polars_ops::series::EWMOptions,
410 },
411
412 #[cfg(feature = "ewma")]
413 EwmStd {
414 input: PhysStream,
415 options: polars_ops::series::EWMOptions,
416 },
417}
418
419fn visit_node_inputs_mut(
420 roots: Vec<PhysNodeKey>,
421 phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
422 mut visit: impl FnMut(&mut PhysStream),
423) {
424 let mut to_visit = roots;
425 let mut seen: SecondaryMap<PhysNodeKey, ()> =
426 to_visit.iter().copied().map(|n| (n, ())).collect();
427 macro_rules! rec {
428 ($n:expr) => {
429 let n = $n;
430 if seen.insert(n, ()).is_none() {
431 to_visit.push(n)
432 }
433 };
434 }
435 while let Some(node) = to_visit.pop() {
436 match &mut phys_sm[node].kind {
437 PhysNodeKind::InMemorySource { .. }
438 | PhysNodeKind::MultiScan { .. }
439 | PhysNodeKind::InputIndependentSelect { .. } => {},
440 #[cfg(feature = "python")]
441 PhysNodeKind::PythonScan { .. } => {},
442 PhysNodeKind::Select { input, .. }
443 | PhysNodeKind::WithRowIndex { input, .. }
444 | PhysNodeKind::Reduce { input, .. }
445 | PhysNodeKind::StreamingSlice { input, .. }
446 | PhysNodeKind::NegativeSlice { input, .. }
447 | PhysNodeKind::Filter { input, .. }
448 | PhysNodeKind::SimpleProjection { input, .. }
449 | PhysNodeKind::InMemorySink { input }
450 | PhysNodeKind::CallbackSink { input, .. }
451 | PhysNodeKind::FileSink { input, .. }
452 | PhysNodeKind::PartitionedSink { input, .. }
453 | PhysNodeKind::InMemoryMap { input, .. }
454 | PhysNodeKind::SortedGroupBy { input, .. }
455 | PhysNodeKind::Map { input, .. }
456 | PhysNodeKind::Sort { input, .. }
457 | PhysNodeKind::Multiplexer { input }
458 | PhysNodeKind::GatherEvery { input, .. }
459 | PhysNodeKind::Rle(input)
460 | PhysNodeKind::RleId(input)
461 | PhysNodeKind::PeakMinMax { input, .. } => {
462 rec!(input.node);
463 visit(input);
464 },
465
466 #[cfg(feature = "dynamic_group_by")]
467 PhysNodeKind::DynamicGroupBy { input, .. } => {
468 rec!(input.node);
469 visit(input);
470 },
471 #[cfg(feature = "dynamic_group_by")]
472 PhysNodeKind::RollingGroupBy { input, .. } => {
473 rec!(input.node);
474 visit(input);
475 },
476
477 #[cfg(feature = "cum_agg")]
478 PhysNodeKind::CumAgg { input, .. } => {
479 rec!(input.node);
480 visit(input);
481 },
482
483 PhysNodeKind::InMemoryJoin {
484 input_left,
485 input_right,
486 ..
487 }
488 | PhysNodeKind::EquiJoin {
489 input_left,
490 input_right,
491 ..
492 }
493 | PhysNodeKind::MergeJoin {
494 input_left,
495 input_right,
496 ..
497 }
498 | PhysNodeKind::SemiAntiJoin {
499 input_left,
500 input_right,
501 ..
502 }
503 | PhysNodeKind::CrossJoin {
504 input_left,
505 input_right,
506 ..
507 } => {
508 rec!(input_left.node);
509 rec!(input_right.node);
510 visit(input_left);
511 visit(input_right);
512 },
513
514 #[cfg(feature = "merge_sorted")]
515 PhysNodeKind::MergeSorted {
516 input_left,
517 input_right,
518 ..
519 } => {
520 rec!(input_left.node);
521 rec!(input_right.node);
522 visit(input_left);
523 visit(input_right);
524 },
525
526 PhysNodeKind::TopK { input, k, .. } => {
527 rec!(input.node);
528 rec!(k.node);
529 visit(input);
530 visit(k);
531 },
532
533 PhysNodeKind::DynamicSlice {
534 input,
535 offset,
536 length,
537 } => {
538 rec!(input.node);
539 rec!(offset.node);
540 rec!(length.node);
541 visit(input);
542 visit(offset);
543 visit(length);
544 },
545
546 PhysNodeKind::Shift {
547 input,
548 offset,
549 fill,
550 } => {
551 rec!(input.node);
552 rec!(offset.node);
553 if let Some(fill) = fill {
554 rec!(fill.node);
555 }
556 visit(input);
557 visit(offset);
558 if let Some(fill) = fill {
559 visit(fill);
560 }
561 },
562
563 PhysNodeKind::Repeat { value, repeats } => {
564 rec!(value.node);
565 rec!(repeats.node);
566 visit(value);
567 visit(repeats);
568 },
569
570 PhysNodeKind::GroupBy { inputs, .. }
571 | PhysNodeKind::OrderedUnion { inputs }
572 | PhysNodeKind::UnorderedUnion { inputs }
573 | PhysNodeKind::Zip { inputs, .. } => {
574 for input in inputs {
575 rec!(input.node);
576 visit(input);
577 }
578 },
579
580 PhysNodeKind::SinkMultiple { sinks } => {
581 for sink in sinks {
582 rec!(*sink);
583 visit(&mut PhysStream::first(*sink));
584 }
585 },
586
587 #[cfg(feature = "ewma")]
588 PhysNodeKind::EwmMean { input, options: _ }
589 | PhysNodeKind::EwmVar { input, options: _ }
590 | PhysNodeKind::EwmStd { input, options: _ } => {
591 rec!(input.node);
592 visit(input)
593 },
594 }
595 }
596}
597
598fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {
599 let mut refcount = PlHashMap::new();
600 visit_node_inputs_mut(roots.clone(), phys_sm, |i| {
601 *refcount.entry(*i).or_insert(0) += 1;
602 });
603
604 let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount
605 .into_iter()
606 .filter(|(_stream, refcount)| *refcount > 1)
607 .map(|(stream, _refcount)| {
608 let input_schema = phys_sm[stream.node].output_schema.clone();
609 let multiplexer_node = phys_sm.insert(PhysNode::new(
610 input_schema,
611 PhysNodeKind::Multiplexer { input: stream },
612 ));
613 (stream, PhysStream::first(multiplexer_node))
614 })
615 .collect();
616
617 visit_node_inputs_mut(roots, phys_sm, |i| {
618 if let Some(m) = multiplexer_map.get_mut(i) {
619 *i = *m;
620 m.port += 1;
621 }
622 });
623}
624
625pub fn build_physical_plan(
626 root: Node,
627 ir_arena: &mut Arena<IR>,
628 expr_arena: &mut Arena<AExpr>,
629 phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
630 ctx: StreamingLowerIRContext,
631) -> PolarsResult<PhysNodeKey> {
632 let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
633 let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
634 let mut cache_nodes = PlHashMap::new();
635 let phys_root = lower_ir::lower_ir(
636 root,
637 ir_arena,
638 expr_arena,
639 phys_sm,
640 &mut schema_cache,
641 &mut expr_cache,
642 &mut cache_nodes,
643 ctx,
644 None,
645 )?;
646 insert_multiplexers(vec![phys_root.node], phys_sm);
647 Ok(phys_root.node)
648}