Skip to main content

polars_stream/physical_plan/
mod.rs

1use std::num::NonZeroUsize;
2use std::sync::Arc;
3
4use polars_core::frame::DataFrame;
5use polars_core::prelude::{IdxSize, InitHashMaps, PlHashMap, SortMultipleOptions};
6use polars_core::schema::{Schema, SchemaRef};
7use polars_error::PolarsResult;
8use polars_io::RowIndex;
9use polars_io::cloud::CloudOptions;
10use polars_ops::frame::JoinArgs;
11use polars_plan::dsl::deletion::DeletionFilesList;
12use polars_plan::dsl::{
13    CastColumnsPolicy, FileSinkOptions, JoinTypeOptionsIR, MissingColumnsPolicy,
14    PartitionedSinkOptionsIR, PredicateFileSkip, ScanSources, TableStatistics,
15};
16use polars_plan::plans::expr_ir::ExprIR;
17use polars_plan::plans::hive::HivePartitionsDf;
18use polars_plan::plans::{AExpr, DataFrameUdf, IR};
19
20mod fmt;
21mod io;
22mod lower_expr;
23mod lower_group_by;
24mod lower_ir;
25mod to_graph;
26
27pub use fmt::{NodeStyle, visualize_plan};
28use polars_plan::prelude::PlanCallback;
29#[cfg(feature = "dynamic_group_by")]
30use polars_time::DynamicGroupOptions;
31use polars_time::{ClosedWindow, Duration};
32use polars_utils::arena::{Arena, Node};
33use polars_utils::pl_str::PlSmallStr;
34use polars_utils::slice_enum::Slice;
35use slotmap::{SecondaryMap, SlotMap};
36pub use to_graph::physical_plan_to_graph;
37
38pub use self::lower_ir::StreamingLowerIRContext;
39use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
40use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
41use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
42use crate::physical_plan::lower_expr::ExprCache;
43
44slotmap::new_key_type! {
45    /// Key used for physical nodes.
46    pub struct PhysNodeKey;
47}
48
49impl PhysNodeKey {
50    pub fn as_ffi(&self) -> u64 {
51        self.0.as_ffi()
52    }
53}
54
55/// A node in the physical plan.
56///
57/// A physical plan is created when the `IR` is translated to a directed
58/// acyclic graph of operations that can run on the streaming engine.
59#[derive(Clone, Debug)]
60pub struct PhysNode {
61    output_schema: Arc<Schema>,
62    kind: PhysNodeKind,
63}
64
65impl PhysNode {
66    pub fn new(output_schema: Arc<Schema>, kind: PhysNodeKind) -> Self {
67        Self {
68            output_schema,
69            kind,
70        }
71    }
72
73    pub fn kind(&self) -> &PhysNodeKind {
74        &self.kind
75    }
76}
77
78/// A handle representing a physical stream of data with a fixed schema in the
79/// physical plan. It consists of a reference to a physical node as well as the
80/// output port on that node to connect to receive the stream.
81#[derive(Clone, Debug, Copy, PartialEq, Eq, Hash)]
82pub struct PhysStream {
83    pub node: PhysNodeKey,
84    pub port: usize,
85}
86
87impl PhysStream {
88    #[allow(unused)]
89    pub fn new(node: PhysNodeKey, port: usize) -> Self {
90        Self { node, port }
91    }
92
93    // Convenience method to refer to the first output port of a physical node.
94    pub fn first(node: PhysNodeKey) -> Self {
95        Self { node, port: 0 }
96    }
97}
98
99/// Behaviour when handling multiple DataFrames with different heights.
100
101#[derive(Clone, Debug, Copy)]
102#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103#[cfg_attr(
104    feature = "physical_plan_visualization_schema",
105    derive(schemars::JsonSchema)
106)]
107pub enum ZipBehavior {
108    /// Fill the shorter DataFrames with nulls to the height of the longest DataFrame.
109    NullExtend,
110    /// All inputs must be the same height, or have length 1 in which case they are broadcast.
111    Broadcast,
112    /// Raise an error if the DataFrames have different heights.
113    Strict,
114}
115
116#[derive(Clone, Debug)]
117pub enum PhysNodeKind {
118    InMemorySource {
119        df: Arc<DataFrame>,
120        disable_morsel_split: bool,
121    },
122
123    Select {
124        input: PhysStream,
125        selectors: Vec<ExprIR>,
126        extend_original: bool,
127    },
128
129    InputIndependentSelect {
130        selectors: Vec<ExprIR>,
131    },
132
133    WithRowIndex {
134        input: PhysStream,
135        name: PlSmallStr,
136        offset: Option<IdxSize>,
137    },
138
139    Reduce {
140        input: PhysStream,
141        exprs: Vec<ExprIR>,
142    },
143
144    StreamingSlice {
145        input: PhysStream,
146        offset: usize,
147        length: usize,
148    },
149
150    NegativeSlice {
151        input: PhysStream,
152        offset: i64,
153        length: usize,
154    },
155
156    DynamicSlice {
157        input: PhysStream,
158        offset: PhysStream,
159        length: PhysStream,
160    },
161
162    Shift {
163        input: PhysStream,
164        offset: PhysStream,
165        fill: Option<PhysStream>,
166    },
167
168    Filter {
169        input: PhysStream,
170        predicate: ExprIR,
171    },
172
173    SimpleProjection {
174        input: PhysStream,
175        columns: Vec<PlSmallStr>,
176    },
177
178    InMemorySink {
179        input: PhysStream,
180    },
181
182    CallbackSink {
183        input: PhysStream,
184        function: PlanCallback<DataFrame, bool>,
185        maintain_order: bool,
186        chunk_size: Option<NonZeroUsize>,
187    },
188
189    FileSink {
190        input: PhysStream,
191        options: FileSinkOptions,
192    },
193
194    PartitionedSink {
195        input: PhysStream,
196        options: PartitionedSinkOptionsIR,
197    },
198
199    SinkMultiple {
200        sinks: Vec<PhysNodeKey>,
201    },
202
203    /// Generic fallback for (as-of-yet) unsupported streaming mappings.
204    /// Fully sinks all data to an in-memory data frame and uses the in-memory
205    /// engine to perform the map.
206    InMemoryMap {
207        input: PhysStream,
208        map: Arc<dyn DataFrameUdf>,
209
210        /// A formatted explain of what the in-memory map. This usually calls format on the IR.
211        format_str: Option<String>,
212    },
213
214    Map {
215        input: PhysStream,
216        map: Arc<dyn DataFrameUdf>,
217
218        /// A formatted explain of what the in-memory map. This usually calls format on the IR.
219        format_str: Option<String>,
220    },
221
222    SortedGroupBy {
223        input: PhysStream,
224        key: PlSmallStr,
225        aggs: Vec<ExprIR>,
226        slice: Option<(IdxSize, IdxSize)>,
227    },
228
229    Sort {
230        input: PhysStream,
231        by_column: Vec<ExprIR>,
232        slice: Option<(i64, usize)>,
233        sort_options: SortMultipleOptions,
234    },
235
236    TopK {
237        input: PhysStream,
238        k: PhysStream,
239        by_column: Vec<ExprIR>,
240        reverse: Vec<bool>,
241        nulls_last: Vec<bool>,
242    },
243
244    Repeat {
245        value: PhysStream,
246        repeats: PhysStream,
247    },
248
249    #[cfg(feature = "cum_agg")]
250    CumAgg {
251        input: PhysStream,
252        kind: crate::nodes::cum_agg::CumAggKind,
253    },
254
255    // Parameter is the input stream
256    GatherEvery {
257        input: PhysStream,
258        n: usize,
259        offset: usize,
260    },
261    Rle(PhysStream),
262    RleId(PhysStream),
263    PeakMinMax {
264        input: PhysStream,
265        is_peak_max: bool,
266    },
267
268    OrderedUnion {
269        inputs: Vec<PhysStream>,
270    },
271
272    UnorderedUnion {
273        inputs: Vec<PhysStream>,
274    },
275
276    Zip {
277        inputs: Vec<PhysStream>,
278        zip_behavior: ZipBehavior,
279    },
280
281    #[allow(unused)]
282    Multiplexer {
283        input: PhysStream,
284    },
285
286    MultiScan {
287        scan_sources: ScanSources,
288
289        file_reader_builder: Arc<dyn FileReaderBuilder>,
290        cloud_options: Option<Arc<CloudOptions>>,
291
292        /// Columns to project from the file.
293        file_projection_builder: ProjectionBuilder,
294        /// Final output schema of morsels being sent out of MultiScan.
295        output_schema: SchemaRef,
296
297        row_index: Option<RowIndex>,
298        pre_slice: Option<Slice>,
299        predicate: Option<ExprIR>,
300        predicate_file_skip_applied: Option<PredicateFileSkip>,
301
302        hive_parts: Option<HivePartitionsDf>,
303        include_file_paths: Option<PlSmallStr>,
304        cast_columns_policy: CastColumnsPolicy,
305        missing_columns_policy: MissingColumnsPolicy,
306        forbid_extra_columns: Option<ForbidExtraColumns>,
307
308        deletion_files: Option<DeletionFilesList>,
309        table_statistics: Option<TableStatistics>,
310
311        /// Schema of columns contained in the file. Does not contain external columns (e.g. hive / row_index).
312        file_schema: SchemaRef,
313        disable_morsel_split: bool,
314    },
315
316    #[cfg(feature = "python")]
317    PythonScan {
318        options: polars_plan::plans::python::PythonOptions,
319    },
320
321    GroupBy {
322        inputs: Vec<PhysStream>,
323        // Must have the same schema when applied for each input.
324        key_per_input: Vec<Vec<ExprIR>>,
325        // Must be a 'simple' expression, a singular column feeding into a single aggregate, or Len.
326        aggs_per_input: Vec<Vec<ExprIR>>,
327    },
328
329    #[cfg(feature = "dynamic_group_by")]
330    DynamicGroupBy {
331        input: PhysStream,
332        options: DynamicGroupOptions,
333        aggs: Vec<ExprIR>,
334        slice: Option<(IdxSize, IdxSize)>,
335    },
336
337    #[cfg(feature = "dynamic_group_by")]
338    RollingGroupBy {
339        input: PhysStream,
340        index_column: PlSmallStr,
341        period: Duration,
342        offset: Duration,
343        closed: ClosedWindow,
344        slice: Option<(IdxSize, IdxSize)>,
345        aggs: Vec<ExprIR>,
346    },
347
348    EquiJoin {
349        input_left: PhysStream,
350        input_right: PhysStream,
351        left_on: Vec<ExprIR>,
352        right_on: Vec<ExprIR>,
353        args: JoinArgs,
354    },
355
356    MergeJoin {
357        input_left: PhysStream,
358        input_right: PhysStream,
359        left_on: Vec<PlSmallStr>,
360        right_on: Vec<PlSmallStr>,
361        descending: bool,
362        nulls_last: bool,
363        keys_row_encoded: bool,
364        args: JoinArgs,
365    },
366
367    SemiAntiJoin {
368        input_left: PhysStream,
369        input_right: PhysStream,
370        left_on: Vec<ExprIR>,
371        right_on: Vec<ExprIR>,
372        args: JoinArgs,
373        output_bool: bool,
374    },
375
376    CrossJoin {
377        input_left: PhysStream,
378        input_right: PhysStream,
379        args: JoinArgs,
380    },
381
382    /// Generic fallback for (as-of-yet) unsupported streaming joins.
383    /// Fully sinks all data to in-memory data frames and uses the in-memory
384    /// engine to perform the join.
385    InMemoryJoin {
386        input_left: PhysStream,
387        input_right: PhysStream,
388        left_on: Vec<ExprIR>,
389        right_on: Vec<ExprIR>,
390        args: JoinArgs,
391        options: Option<JoinTypeOptionsIR>,
392    },
393
394    #[cfg(feature = "merge_sorted")]
395    MergeSorted {
396        input_left: PhysStream,
397        input_right: PhysStream,
398    },
399
400    #[cfg(feature = "ewma")]
401    EwmMean {
402        input: PhysStream,
403        options: polars_ops::series::EWMOptions,
404    },
405
406    #[cfg(feature = "ewma")]
407    EwmVar {
408        input: PhysStream,
409        options: polars_ops::series::EWMOptions,
410    },
411
412    #[cfg(feature = "ewma")]
413    EwmStd {
414        input: PhysStream,
415        options: polars_ops::series::EWMOptions,
416    },
417}
418
419fn visit_node_inputs_mut(
420    roots: Vec<PhysNodeKey>,
421    phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
422    mut visit: impl FnMut(&mut PhysStream),
423) {
424    let mut to_visit = roots;
425    let mut seen: SecondaryMap<PhysNodeKey, ()> =
426        to_visit.iter().copied().map(|n| (n, ())).collect();
427    macro_rules! rec {
428        ($n:expr) => {
429            let n = $n;
430            if seen.insert(n, ()).is_none() {
431                to_visit.push(n)
432            }
433        };
434    }
435    while let Some(node) = to_visit.pop() {
436        match &mut phys_sm[node].kind {
437            PhysNodeKind::InMemorySource { .. }
438            | PhysNodeKind::MultiScan { .. }
439            | PhysNodeKind::InputIndependentSelect { .. } => {},
440            #[cfg(feature = "python")]
441            PhysNodeKind::PythonScan { .. } => {},
442            PhysNodeKind::Select { input, .. }
443            | PhysNodeKind::WithRowIndex { input, .. }
444            | PhysNodeKind::Reduce { input, .. }
445            | PhysNodeKind::StreamingSlice { input, .. }
446            | PhysNodeKind::NegativeSlice { input, .. }
447            | PhysNodeKind::Filter { input, .. }
448            | PhysNodeKind::SimpleProjection { input, .. }
449            | PhysNodeKind::InMemorySink { input }
450            | PhysNodeKind::CallbackSink { input, .. }
451            | PhysNodeKind::FileSink { input, .. }
452            | PhysNodeKind::PartitionedSink { input, .. }
453            | PhysNodeKind::InMemoryMap { input, .. }
454            | PhysNodeKind::SortedGroupBy { input, .. }
455            | PhysNodeKind::Map { input, .. }
456            | PhysNodeKind::Sort { input, .. }
457            | PhysNodeKind::Multiplexer { input }
458            | PhysNodeKind::GatherEvery { input, .. }
459            | PhysNodeKind::Rle(input)
460            | PhysNodeKind::RleId(input)
461            | PhysNodeKind::PeakMinMax { input, .. } => {
462                rec!(input.node);
463                visit(input);
464            },
465
466            #[cfg(feature = "dynamic_group_by")]
467            PhysNodeKind::DynamicGroupBy { input, .. } => {
468                rec!(input.node);
469                visit(input);
470            },
471            #[cfg(feature = "dynamic_group_by")]
472            PhysNodeKind::RollingGroupBy { input, .. } => {
473                rec!(input.node);
474                visit(input);
475            },
476
477            #[cfg(feature = "cum_agg")]
478            PhysNodeKind::CumAgg { input, .. } => {
479                rec!(input.node);
480                visit(input);
481            },
482
483            PhysNodeKind::InMemoryJoin {
484                input_left,
485                input_right,
486                ..
487            }
488            | PhysNodeKind::EquiJoin {
489                input_left,
490                input_right,
491                ..
492            }
493            | PhysNodeKind::MergeJoin {
494                input_left,
495                input_right,
496                ..
497            }
498            | PhysNodeKind::SemiAntiJoin {
499                input_left,
500                input_right,
501                ..
502            }
503            | PhysNodeKind::CrossJoin {
504                input_left,
505                input_right,
506                ..
507            } => {
508                rec!(input_left.node);
509                rec!(input_right.node);
510                visit(input_left);
511                visit(input_right);
512            },
513
514            #[cfg(feature = "merge_sorted")]
515            PhysNodeKind::MergeSorted {
516                input_left,
517                input_right,
518                ..
519            } => {
520                rec!(input_left.node);
521                rec!(input_right.node);
522                visit(input_left);
523                visit(input_right);
524            },
525
526            PhysNodeKind::TopK { input, k, .. } => {
527                rec!(input.node);
528                rec!(k.node);
529                visit(input);
530                visit(k);
531            },
532
533            PhysNodeKind::DynamicSlice {
534                input,
535                offset,
536                length,
537            } => {
538                rec!(input.node);
539                rec!(offset.node);
540                rec!(length.node);
541                visit(input);
542                visit(offset);
543                visit(length);
544            },
545
546            PhysNodeKind::Shift {
547                input,
548                offset,
549                fill,
550            } => {
551                rec!(input.node);
552                rec!(offset.node);
553                if let Some(fill) = fill {
554                    rec!(fill.node);
555                }
556                visit(input);
557                visit(offset);
558                if let Some(fill) = fill {
559                    visit(fill);
560                }
561            },
562
563            PhysNodeKind::Repeat { value, repeats } => {
564                rec!(value.node);
565                rec!(repeats.node);
566                visit(value);
567                visit(repeats);
568            },
569
570            PhysNodeKind::GroupBy { inputs, .. }
571            | PhysNodeKind::OrderedUnion { inputs }
572            | PhysNodeKind::UnorderedUnion { inputs }
573            | PhysNodeKind::Zip { inputs, .. } => {
574                for input in inputs {
575                    rec!(input.node);
576                    visit(input);
577                }
578            },
579
580            PhysNodeKind::SinkMultiple { sinks } => {
581                for sink in sinks {
582                    rec!(*sink);
583                    visit(&mut PhysStream::first(*sink));
584                }
585            },
586
587            #[cfg(feature = "ewma")]
588            PhysNodeKind::EwmMean { input, options: _ }
589            | PhysNodeKind::EwmVar { input, options: _ }
590            | PhysNodeKind::EwmStd { input, options: _ } => {
591                rec!(input.node);
592                visit(input)
593            },
594        }
595    }
596}
597
598fn insert_multiplexers(roots: Vec<PhysNodeKey>, phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>) {
599    let mut refcount = PlHashMap::new();
600    visit_node_inputs_mut(roots.clone(), phys_sm, |i| {
601        *refcount.entry(*i).or_insert(0) += 1;
602    });
603
604    let mut multiplexer_map: PlHashMap<PhysStream, PhysStream> = refcount
605        .into_iter()
606        .filter(|(_stream, refcount)| *refcount > 1)
607        .map(|(stream, _refcount)| {
608            let input_schema = phys_sm[stream.node].output_schema.clone();
609            let multiplexer_node = phys_sm.insert(PhysNode::new(
610                input_schema,
611                PhysNodeKind::Multiplexer { input: stream },
612            ));
613            (stream, PhysStream::first(multiplexer_node))
614        })
615        .collect();
616
617    visit_node_inputs_mut(roots, phys_sm, |i| {
618        if let Some(m) = multiplexer_map.get_mut(i) {
619            *i = *m;
620            m.port += 1;
621        }
622    });
623}
624
625pub fn build_physical_plan(
626    root: Node,
627    ir_arena: &mut Arena<IR>,
628    expr_arena: &mut Arena<AExpr>,
629    phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
630    ctx: StreamingLowerIRContext,
631) -> PolarsResult<PhysNodeKey> {
632    let mut schema_cache = PlHashMap::with_capacity(ir_arena.len());
633    let mut expr_cache = ExprCache::with_capacity(expr_arena.len());
634    let mut cache_nodes = PlHashMap::new();
635    let phys_root = lower_ir::lower_ir(
636        root,
637        ir_arena,
638        expr_arena,
639        phys_sm,
640        &mut schema_cache,
641        &mut expr_cache,
642        &mut cache_nodes,
643        ctx,
644        None,
645    )?;
646    insert_multiplexers(vec![phys_root.node], phys_sm);
647    Ok(phys_root.node)
648}