Skip to main content

polars_stream/
skeleton.rs

1#![allow(unused)] // TODO: remove me
2use std::cmp::Reverse;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
9use polars_plan::plans::{IR, IRPlan};
10use polars_plan::prelude::AExpr;
11use polars_plan::prelude::expr_ir::ExprIR;
12use polars_utils::arena::{Arena, Node};
13use polars_utils::relaxed_cell::RelaxedCell;
14use slotmap::{SecondaryMap, SlotMap};
15
16use crate::graph::{Graph, GraphNodeKey};
17use crate::metrics::GraphMetrics;
18use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
19
20/// Executes the IR with the streaming engine.
21///
22/// Unsupported operations can fall back to the in-memory engine.
23///
24/// Returns:
25/// - `Ok(QueryResult::Single(DataFrame))` when collecting to a single sink.
26/// - `Ok(QueryResult::Multiple(Vec<DataFrame>))` when collecting to multiple sinks.
27/// - `Err` if the IR can't be executed.
28///
29/// Returned `DataFrame`s contain data only for memory sinks,
30/// `DataFrame`s corresponding to file sinks are empty.
31pub fn run_query(
32    node: Node,
33    ir_arena: &mut Arena<IR>,
34    expr_arena: &mut Arena<AExpr>,
35) -> PolarsResult<QueryResult> {
36    StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
37}
38
39/// Visualizes the physical plan as a dot graph.
40pub fn visualize_physical_plan(
41    node: Node,
42    ir_arena: &mut Arena<IR>,
43    expr_arena: &mut Arena<AExpr>,
44) -> PolarsResult<String> {
45    let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
46
47    let ctx = StreamingLowerIRContext {
48        prepare_visualization: true,
49    };
50    let root_phys_node =
51        crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
52
53    let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
54
55    Ok(out)
56}
57
58pub struct StreamingQuery {
59    top_ir: IR,
60    pub graph: Graph,
61    pub root_phys_node: PhysNodeKey,
62    pub phys_sm: SlotMap<PhysNodeKey, PhysNode>,
63    pub phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
64    pub metrics: Option<Arc<Mutex<GraphMetrics>>>,
65}
66
67/// Configures if IR lowering creates the `format_str` for `InMemoryMap`.
68pub static PREPARE_VISUALIZATION_DATA: RelaxedCell<bool> = RelaxedCell::new_bool(false);
69
70/// Sets config to ensure IR lowering always creates the `format_str` for `InMemoryMap`.
71pub fn always_prepare_visualization_data() {
72    PREPARE_VISUALIZATION_DATA.store(true);
73}
74
75fn cfg_prepare_visualization_data() -> bool {
76    if !PREPARE_VISUALIZATION_DATA.load() {
77        PREPARE_VISUALIZATION_DATA.fetch_or(
78            std::env::var("POLARS_STREAM_ALWAYS_PREPARE_VISUALIZATION_DATA").as_deref() == Ok("1"),
79        );
80    }
81
82    PREPARE_VISUALIZATION_DATA.load()
83}
84
85impl StreamingQuery {
86    pub fn build(
87        node: Node,
88        ir_arena: &mut Arena<IR>,
89        expr_arena: &mut Arena<AExpr>,
90    ) -> PolarsResult<Self> {
91        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
92            let plan = IRPlan {
93                lp_top: node,
94                lp_arena: ir_arena.clone(),
95                expr_arena: expr_arena.clone(),
96            };
97            let visualization = plan.display_dot().to_string();
98            std::fs::write(visual_path, visualization).unwrap();
99        }
100        let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
101        let ctx = StreamingLowerIRContext {
102            prepare_visualization: cfg_prepare_visualization_data(),
103        };
104        let root_phys_node = crate::physical_plan::build_physical_plan(
105            node,
106            ir_arena,
107            expr_arena,
108            &mut phys_sm,
109            ctx,
110        )?;
111        if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
112            let visualization =
113                crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
114            std::fs::write(visual_path, visualization).unwrap();
115        }
116
117        let (mut graph, phys_to_graph) =
118            crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
119
120        let top_ir = ir_arena.get(node).clone();
121
122        let metrics = if std::env::var("POLARS_TRACK_METRICS").as_deref() == Ok("1")
123            || std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
124        {
125            crate::async_executor::track_task_metrics(true);
126            Some(Arc::default())
127        } else {
128            None
129        };
130
131        let out = StreamingQuery {
132            top_ir,
133            graph,
134            root_phys_node,
135            phys_sm,
136            phys_to_graph,
137            metrics,
138        };
139
140        Ok(out)
141    }
142
143    pub fn execute(self) -> PolarsResult<QueryResult> {
144        let StreamingQuery {
145            top_ir,
146            mut graph,
147            root_phys_node,
148            phys_sm,
149            phys_to_graph,
150            metrics,
151        } = self;
152
153        let query_start = Instant::now();
154        let mut results = crate::execute::execute_graph(&mut graph, metrics.clone())?;
155        let query_elapsed = query_start.elapsed();
156
157        // Print metrics.
158        if let Some(lock) = metrics
159            && std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
160        {
161            let mut total_query_ns = 0;
162            let mut lines = Vec::new();
163            let m = lock.lock();
164            for phys_node_key in phys_sm.keys() {
165                let Some(graph_node_key) = phys_to_graph.get(phys_node_key) else {
166                    continue;
167                };
168                let Some(node_metrics) = m.get(*graph_node_key) else {
169                    continue;
170                };
171                let name = graph.nodes[*graph_node_key].compute.name();
172                let total_ns =
173                    node_metrics.total_poll_time_ns + node_metrics.total_state_update_time_ns;
174                let total_time = Duration::from_nanos(total_ns);
175                let poll_time = Duration::from_nanos(node_metrics.total_poll_time_ns);
176                let update_time = Duration::from_nanos(node_metrics.total_state_update_time_ns);
177                let max_poll_time = Duration::from_nanos(node_metrics.max_poll_time_ns);
178                let max_update_time = Duration::from_nanos(node_metrics.max_state_update_time_ns);
179                let total_polls = node_metrics.total_polls;
180                let total_updates = node_metrics.total_state_updates;
181                let perc_stolen = node_metrics.total_stolen_polls as f64
182                    / node_metrics.total_polls as f64
183                    * 100.0;
184
185                let rows_received = node_metrics.rows_received;
186                let morsels_received = node_metrics.morsels_received;
187                let max_received = node_metrics.largest_morsel_received;
188                let rows_sent = node_metrics.rows_sent;
189                let morsels_sent = node_metrics.morsels_sent;
190                let max_sent = node_metrics.largest_morsel_sent;
191
192                let io_total_active_time = Duration::from_nanos(node_metrics.io_total_active_ns);
193                let io_total_bytes_requested = node_metrics.io_total_bytes_requested;
194                let io_total_bytes_received = node_metrics.io_total_bytes_received;
195
196                lines.push(
197                    (total_time, format!(
198                        "{name}: tot({total_time:.2?}), \
199                                 poll({poll_time:.2?}, n={total_polls}, max={max_poll_time:.2?}, stolen={perc_stolen:.1}%), \
200                                 update({update_time:.2?}, n={total_updates}, max={max_update_time:.2?}), \
201                                 recv(row={rows_received}, morsel={morsels_received}, max={max_received}), \
202                                 sent(row={rows_sent}, morsel={morsels_sent}, max={max_sent}), \
203                                 io(\
204                                    total_active_time={io_total_active_time:.2?}, \
205                                    total_bytes_requested={io_total_bytes_requested}, \
206                                    total_bytes_received={io_total_bytes_received})"))
207                );
208
209                total_query_ns += total_ns;
210            }
211            lines.sort_by_key(|(tot, _)| Reverse(*tot));
212
213            let total_query_time = Duration::from_nanos(total_query_ns);
214            eprintln!(
215                "Streaming query took {query_elapsed:.2?} ({total_query_time:.2?} CPU), detailed breakdown:"
216            );
217            for (_tot, line) in lines {
218                eprintln!("{line}");
219            }
220            eprintln!();
221        }
222
223        match top_ir {
224            IR::SinkMultiple { inputs } => {
225                let phys_node = &phys_sm[root_phys_node];
226                let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
227                    unreachable!();
228                };
229
230                Ok(QueryResult::Multiple(
231                    sinks
232                        .iter()
233                        .map(|phys_node_key| {
234                            results
235                                .remove(phys_to_graph[*phys_node_key])
236                                .unwrap_or_else(DataFrame::empty)
237                        })
238                        .collect(),
239                ))
240            },
241            _ => Ok(QueryResult::Single(
242                results
243                    .remove(phys_to_graph[root_phys_node])
244                    .unwrap_or_else(DataFrame::empty),
245            )),
246        }
247    }
248}
249
250pub enum QueryResult {
251    Single(DataFrame),
252    /// Collected to multiple in-memory sinks
253    Multiple(Vec<DataFrame>),
254}
255
256impl QueryResult {
257    pub fn unwrap_single(self) -> DataFrame {
258        use QueryResult::*;
259        match self {
260            Single(df) => df,
261            Multiple(_) => panic!(),
262        }
263    }
264
265    pub fn unwrap_multiple(self) -> Vec<DataFrame> {
266        use QueryResult::*;
267        match self {
268            Single(_) => panic!(),
269            Multiple(dfs) => dfs,
270        }
271    }
272}