polars_stream/
skeleton.rs

1#![allow(unused)] // TODO: remove me
2use std::cmp::Reverse;
3
4use polars_core::POOL;
5use polars_core::prelude::*;
6use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
7use polars_plan::plans::{Context, IR, IRPlan};
8use polars_plan::prelude::AExpr;
9use polars_plan::prelude::expr_ir::ExprIR;
10use polars_utils::arena::{Arena, Node};
11use slotmap::{SecondaryMap, SlotMap};
12
13use crate::graph::{Graph, GraphNodeKey};
14use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
15
16/// Executes the IR with the streaming engine.
17///
18/// Unsupported operations can fall back to the in-memory engine.
19///
20/// Returns:
21/// - `Ok(QueryResult::Single(DataFrame))` when collecting to a single sink.
22/// - `Ok(QueryResult::Multiple(Vec<DataFrame>))` when collecting to multiple sinks.
23/// - `Err` if the IR can't be executed.
24///
25/// Returned `DataFrame`s contain data only for memory sinks,
26/// `DataFrame`s corresponding to file sinks are empty.
27pub fn run_query(
28    node: Node,
29    ir_arena: &mut Arena<IR>,
30    expr_arena: &mut Arena<AExpr>,
31) -> PolarsResult<QueryResult> {
32    StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
33}
34
35/// Visualizes the physical plan as a dot graph.
36pub fn visualize_physical_plan(
37    node: Node,
38    ir_arena: &mut Arena<IR>,
39    expr_arena: &mut Arena<AExpr>,
40) -> PolarsResult<String> {
41    let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
42
43    let ctx = StreamingLowerIRContext {
44        prepare_visualization: true,
45    };
46    let root_phys_node =
47        crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
48
49    let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
50
51    Ok(out)
52}
53
54pub struct StreamingQuery {
55    top_ir: IR,
56    graph: Graph,
57    root_phys_node: PhysNodeKey,
58    phys_sm: SlotMap<PhysNodeKey, PhysNode>,
59    phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
60}
61
62impl StreamingQuery {
63    pub fn build(
64        node: Node,
65        ir_arena: &mut Arena<IR>,
66        expr_arena: &mut Arena<AExpr>,
67    ) -> PolarsResult<Self> {
68        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
69            let plan = IRPlan {
70                lp_top: node,
71                lp_arena: ir_arena.clone(),
72                expr_arena: expr_arena.clone(),
73            };
74            let visualization = plan.display_dot().to_string();
75            std::fs::write(visual_path, visualization).unwrap();
76        }
77        let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
78        let ctx = StreamingLowerIRContext {
79            prepare_visualization: false,
80        };
81        let root_phys_node = crate::physical_plan::build_physical_plan(
82            node,
83            ir_arena,
84            expr_arena,
85            &mut phys_sm,
86            ctx,
87        )?;
88        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
89            let visualization =
90                crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
91            std::fs::write(visual_path, visualization).unwrap();
92        }
93
94        let (mut graph, phys_to_graph) =
95            crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
96
97        let top_ir = ir_arena.get(node).clone();
98
99        let out = StreamingQuery {
100            top_ir,
101            graph,
102            root_phys_node,
103            phys_sm,
104            phys_to_graph,
105        };
106
107        Ok(out)
108    }
109
110    pub fn execute(self) -> PolarsResult<QueryResult> {
111        let StreamingQuery {
112            top_ir,
113            mut graph,
114            root_phys_node,
115            phys_sm,
116            phys_to_graph,
117        } = self;
118
119        crate::async_executor::clear_task_wait_statistics();
120        let mut results = crate::execute::execute_graph(&mut graph)?;
121
122        if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
123            let mut stats = crate::async_executor::get_task_wait_statistics();
124            stats.sort_by_key(|(_l, w)| Reverse(*w));
125            eprintln!("Time spent waiting for async tasks:");
126            for (loc, wait_time) in stats {
127                eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time);
128            }
129        }
130
131        match top_ir {
132            IR::SinkMultiple { inputs } => {
133                let phys_node = &phys_sm[root_phys_node];
134                let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
135                    unreachable!();
136                };
137
138                Ok(QueryResult::Multiple(
139                    sinks
140                        .iter()
141                        .map(|phys_node_key| {
142                            results
143                                .remove(phys_to_graph[*phys_node_key])
144                                .unwrap_or_else(DataFrame::empty)
145                        })
146                        .collect(),
147                ))
148            },
149            _ => Ok(QueryResult::Single(
150                results
151                    .remove(phys_to_graph[root_phys_node])
152                    .unwrap_or_else(DataFrame::empty),
153            )),
154        }
155    }
156}
157
158pub enum QueryResult {
159    Single(DataFrame),
160    /// Collected to multiple in-memory sinks
161    Multiple(Vec<DataFrame>),
162}
163
164impl QueryResult {
165    pub fn unwrap_single(self) -> DataFrame {
166        use QueryResult::*;
167        match self {
168            Single(df) => df,
169            Multiple(_) => panic!(),
170        }
171    }
172
173    pub fn unwrap_multiple(self) -> Vec<DataFrame> {
174        use QueryResult::*;
175        match self {
176            Single(_) => panic!(),
177            Multiple(dfs) => dfs,
178        }
179    }
180}