polars_stream/
skeleton.rs

1#![allow(unused)] // TODO: remove me
2use std::cmp::Reverse;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
9use polars_plan::plans::{Context, IR, IRPlan};
10use polars_plan::prelude::AExpr;
11use polars_plan::prelude::expr_ir::ExprIR;
12use polars_utils::arena::{Arena, Node};
13use polars_utils::relaxed_cell::RelaxedCell;
14use slotmap::{SecondaryMap, SlotMap};
15
16use crate::graph::{Graph, GraphNodeKey};
17use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
18
19/// Executes the IR with the streaming engine.
20///
21/// Unsupported operations can fall back to the in-memory engine.
22///
23/// Returns:
24/// - `Ok(QueryResult::Single(DataFrame))` when collecting to a single sink.
25/// - `Ok(QueryResult::Multiple(Vec<DataFrame>))` when collecting to multiple sinks.
26/// - `Err` if the IR can't be executed.
27///
28/// Returned `DataFrame`s contain data only for memory sinks,
29/// `DataFrame`s corresponding to file sinks are empty.
30pub fn run_query(
31    node: Node,
32    ir_arena: &mut Arena<IR>,
33    expr_arena: &mut Arena<AExpr>,
34) -> PolarsResult<QueryResult> {
35    StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
36}
37
38/// Visualizes the physical plan as a dot graph.
39pub fn visualize_physical_plan(
40    node: Node,
41    ir_arena: &mut Arena<IR>,
42    expr_arena: &mut Arena<AExpr>,
43) -> PolarsResult<String> {
44    let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
45
46    let ctx = StreamingLowerIRContext {
47        prepare_visualization: true,
48    };
49    let root_phys_node =
50        crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
51
52    let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
53
54    Ok(out)
55}
56
57pub struct StreamingQuery {
58    top_ir: IR,
59    graph: Graph,
60    root_phys_node: PhysNodeKey,
61    phys_sm: SlotMap<PhysNodeKey, PhysNode>,
62    phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
63}
64
65/// Configures if IR lowering creates the `format_str` for `InMemoryMap`.
66pub static PREPARE_VISUALIZATION_DATA: RelaxedCell<bool> = RelaxedCell::new_bool(false);
67
68/// Sets config to ensure IR lowering always creates the `format_str` for `InMemoryMap`.
69pub fn always_prepare_visualization_data() {
70    PREPARE_VISUALIZATION_DATA.store(true);
71}
72
73fn cfg_prepare_visualization_data() -> bool {
74    if !PREPARE_VISUALIZATION_DATA.load() {
75        PREPARE_VISUALIZATION_DATA.fetch_or(
76            std::env::var("POLARS_STREAM_ALWAYS_PREPARE_VISUALIZATION_DATA").as_deref() == Ok("1"),
77        );
78    }
79
80    PREPARE_VISUALIZATION_DATA.load()
81}
82
83impl StreamingQuery {
84    pub fn build(
85        node: Node,
86        ir_arena: &mut Arena<IR>,
87        expr_arena: &mut Arena<AExpr>,
88    ) -> PolarsResult<Self> {
89        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
90            let plan = IRPlan {
91                lp_top: node,
92                lp_arena: ir_arena.clone(),
93                expr_arena: expr_arena.clone(),
94            };
95            let visualization = plan.display_dot().to_string();
96            std::fs::write(visual_path, visualization).unwrap();
97        }
98        let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
99        let ctx = StreamingLowerIRContext {
100            prepare_visualization: cfg_prepare_visualization_data(),
101        };
102        let root_phys_node = crate::physical_plan::build_physical_plan(
103            node,
104            ir_arena,
105            expr_arena,
106            &mut phys_sm,
107            ctx,
108        )?;
109        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
110            let visualization =
111                crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
112            std::fs::write(visual_path, visualization).unwrap();
113        }
114
115        let (mut graph, phys_to_graph) =
116            crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
117
118        let top_ir = ir_arena.get(node).clone();
119
120        let out = StreamingQuery {
121            top_ir,
122            graph,
123            root_phys_node,
124            phys_sm,
125            phys_to_graph,
126        };
127
128        Ok(out)
129    }
130
131    pub fn execute(self) -> PolarsResult<QueryResult> {
132        let StreamingQuery {
133            top_ir,
134            mut graph,
135            root_phys_node,
136            phys_sm,
137            phys_to_graph,
138        } = self;
139
140        let metrics = if std::env::var("POLARS_TRACK_METRICS").as_deref() == Ok("1") {
141            crate::async_executor::track_task_metrics(true);
142            Some(Arc::default())
143        } else {
144            None
145        };
146
147        let query_start = Instant::now();
148        let mut results = crate::execute::execute_graph(&mut graph, metrics.clone())?;
149        let query_elapsed = query_start.elapsed();
150
151        // Print metrics.
152        if let Some(lock) = metrics {
153            let mut total_query_ns = 0;
154            let mut lines = Vec::new();
155            let m = lock.lock();
156            for phys_node_key in phys_sm.keys() {
157                let Some(graph_node_key) = phys_to_graph.get(phys_node_key) else {
158                    continue;
159                };
160                let Some(node_metrics) = m.get(*graph_node_key) else {
161                    continue;
162                };
163                let name = graph.nodes[*graph_node_key].compute.name();
164                let total_ns =
165                    node_metrics.total_poll_time_ns + node_metrics.total_state_update_time_ns;
166                let total_time = Duration::from_nanos(total_ns);
167                let poll_time = Duration::from_nanos(node_metrics.total_poll_time_ns);
168                let update_time = Duration::from_nanos(node_metrics.total_state_update_time_ns);
169                let max_poll_time = Duration::from_nanos(node_metrics.max_poll_time_ns);
170                let max_update_time = Duration::from_nanos(node_metrics.max_state_update_time_ns);
171                let total_polls = node_metrics.total_polls;
172                let total_updates = node_metrics.total_state_updates;
173                let perc_stolen = node_metrics.total_stolen_polls as f64
174                    / node_metrics.total_polls as f64
175                    * 100.0;
176
177                let rows_received = node_metrics.rows_received;
178                let morsels_received = node_metrics.morsels_received;
179                let max_received = node_metrics.largest_morsel_received;
180                let rows_sent = node_metrics.rows_sent;
181                let morsels_sent = node_metrics.morsels_sent;
182                let max_sent = node_metrics.largest_morsel_sent;
183
184                lines.push(
185                    (total_time, format!(
186                        "{name}: tot({total_time:.2?}), \
187                                 poll({poll_time:.2?}, n={total_polls}, max={max_poll_time:.2?}, stolen={perc_stolen:.1}%), \
188                                 update({update_time:.2?}, n={total_updates}, max={max_update_time:.2?}), \
189                                 recv(row={rows_received}, morsel={morsels_received}, max={max_received}), \
190                                 sent(row={rows_sent}, morsel={morsels_sent}, max={max_sent})"))
191                );
192
193                total_query_ns += total_ns;
194            }
195            lines.sort_by_key(|(tot, _)| Reverse(*tot));
196
197            let total_query_time = Duration::from_nanos(total_query_ns);
198            eprintln!(
199                "Streaming query took {query_elapsed:.2?} ({total_query_time:.2?} CPU), detailed breakdown:"
200            );
201            for (_tot, line) in lines {
202                eprintln!("{line}");
203            }
204            eprintln!();
205        }
206
207        match top_ir {
208            IR::SinkMultiple { inputs } => {
209                let phys_node = &phys_sm[root_phys_node];
210                let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
211                    unreachable!();
212                };
213
214                Ok(QueryResult::Multiple(
215                    sinks
216                        .iter()
217                        .map(|phys_node_key| {
218                            results
219                                .remove(phys_to_graph[*phys_node_key])
220                                .unwrap_or_else(DataFrame::empty)
221                        })
222                        .collect(),
223                ))
224            },
225            _ => Ok(QueryResult::Single(
226                results
227                    .remove(phys_to_graph[root_phys_node])
228                    .unwrap_or_else(DataFrame::empty),
229            )),
230        }
231    }
232}
233
234pub enum QueryResult {
235    Single(DataFrame),
236    /// Collected to multiple in-memory sinks
237    Multiple(Vec<DataFrame>),
238}
239
240impl QueryResult {
241    pub fn unwrap_single(self) -> DataFrame {
242        use QueryResult::*;
243        match self {
244            Single(df) => df,
245            Multiple(_) => panic!(),
246        }
247    }
248
249    pub fn unwrap_multiple(self) -> Vec<DataFrame> {
250        use QueryResult::*;
251        match self {
252            Single(_) => panic!(),
253            Multiple(dfs) => dfs,
254        }
255    }
256}