1#![allow(unused)] use std::cmp::Reverse;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
9use polars_plan::plans::{IR, IRPlan};
10use polars_plan::prelude::AExpr;
11use polars_plan::prelude::expr_ir::ExprIR;
12use polars_utils::arena::{Arena, Node};
13use polars_utils::relaxed_cell::RelaxedCell;
14use slotmap::{SecondaryMap, SlotMap};
15
16use crate::graph::{Graph, GraphNodeKey};
17use crate::metrics::GraphMetrics;
18use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
19
20pub fn run_query(
32 node: Node,
33 ir_arena: &mut Arena<IR>,
34 expr_arena: &mut Arena<AExpr>,
35) -> PolarsResult<QueryResult> {
36 StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
37}
38
39pub fn visualize_physical_plan(
41 node: Node,
42 ir_arena: &mut Arena<IR>,
43 expr_arena: &mut Arena<AExpr>,
44) -> PolarsResult<String> {
45 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
46
47 let ctx = StreamingLowerIRContext {
48 prepare_visualization: true,
49 };
50 let root_phys_node =
51 crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
52
53 let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
54
55 Ok(out)
56}
57
58pub struct StreamingQuery {
59 top_ir: IR,
60 pub graph: Graph,
61 pub root_phys_node: PhysNodeKey,
62 pub phys_sm: SlotMap<PhysNodeKey, PhysNode>,
63 pub phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
64 pub metrics: Option<Arc<Mutex<GraphMetrics>>>,
65}
66
67pub static PREPARE_VISUALIZATION_DATA: RelaxedCell<bool> = RelaxedCell::new_bool(false);
69
70pub fn always_prepare_visualization_data() {
72 PREPARE_VISUALIZATION_DATA.store(true);
73}
74
75fn cfg_prepare_visualization_data() -> bool {
76 if !PREPARE_VISUALIZATION_DATA.load() {
77 PREPARE_VISUALIZATION_DATA.fetch_or(
78 std::env::var("POLARS_STREAM_ALWAYS_PREPARE_VISUALIZATION_DATA").as_deref() == Ok("1"),
79 );
80 }
81
82 PREPARE_VISUALIZATION_DATA.load()
83}
84
85impl StreamingQuery {
86 pub fn build(
87 node: Node,
88 ir_arena: &mut Arena<IR>,
89 expr_arena: &mut Arena<AExpr>,
90 ) -> PolarsResult<Self> {
91 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
92 let plan = IRPlan {
93 lp_top: node,
94 lp_arena: ir_arena.clone(),
95 expr_arena: expr_arena.clone(),
96 };
97 let visualization = plan.display_dot().to_string();
98 std::fs::write(visual_path, visualization).unwrap();
99 }
100 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
101 let ctx = StreamingLowerIRContext {
102 prepare_visualization: cfg_prepare_visualization_data(),
103 };
104 let root_phys_node = crate::physical_plan::build_physical_plan(
105 node,
106 ir_arena,
107 expr_arena,
108 &mut phys_sm,
109 ctx,
110 )?;
111 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
112 let visualization =
113 crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
114 std::fs::write(visual_path, visualization).unwrap();
115 }
116
117 let (mut graph, phys_to_graph) =
118 crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
119
120 let top_ir = ir_arena.get(node).clone();
121
122 let metrics = if std::env::var("POLARS_TRACK_METRICS").as_deref() == Ok("1")
123 || std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
124 {
125 crate::async_executor::track_task_metrics(true);
126 Some(Arc::default())
127 } else {
128 None
129 };
130
131 let out = StreamingQuery {
132 top_ir,
133 graph,
134 root_phys_node,
135 phys_sm,
136 phys_to_graph,
137 metrics,
138 };
139
140 Ok(out)
141 }
142
143 pub fn execute(self) -> PolarsResult<QueryResult> {
144 let StreamingQuery {
145 top_ir,
146 mut graph,
147 root_phys_node,
148 phys_sm,
149 phys_to_graph,
150 metrics,
151 } = self;
152
153 let query_start = Instant::now();
154 let mut results = crate::execute::execute_graph(&mut graph, metrics.clone())?;
155 let query_elapsed = query_start.elapsed();
156
157 if let Some(lock) = metrics
159 && std::env::var("POLARS_LOG_METRICS").as_deref() == Ok("1")
160 {
161 let mut total_query_ns = 0;
162 let mut lines = Vec::new();
163 let m = lock.lock();
164 for phys_node_key in phys_sm.keys() {
165 let Some(graph_node_key) = phys_to_graph.get(phys_node_key) else {
166 continue;
167 };
168 let Some(node_metrics) = m.get(*graph_node_key) else {
169 continue;
170 };
171 let name = graph.nodes[*graph_node_key].compute.name();
172 let total_ns =
173 node_metrics.total_poll_time_ns + node_metrics.total_state_update_time_ns;
174 let total_time = Duration::from_nanos(total_ns);
175 let poll_time = Duration::from_nanos(node_metrics.total_poll_time_ns);
176 let update_time = Duration::from_nanos(node_metrics.total_state_update_time_ns);
177 let max_poll_time = Duration::from_nanos(node_metrics.max_poll_time_ns);
178 let max_update_time = Duration::from_nanos(node_metrics.max_state_update_time_ns);
179 let total_polls = node_metrics.total_polls;
180 let total_updates = node_metrics.total_state_updates;
181 let perc_stolen = node_metrics.total_stolen_polls as f64
182 / node_metrics.total_polls as f64
183 * 100.0;
184
185 let rows_received = node_metrics.rows_received;
186 let morsels_received = node_metrics.morsels_received;
187 let max_received = node_metrics.largest_morsel_received;
188 let rows_sent = node_metrics.rows_sent;
189 let morsels_sent = node_metrics.morsels_sent;
190 let max_sent = node_metrics.largest_morsel_sent;
191
192 let io_total_active_time = Duration::from_nanos(node_metrics.io_total_active_ns);
193 let io_total_bytes_requested = node_metrics.io_total_bytes_requested;
194 let io_total_bytes_received = node_metrics.io_total_bytes_received;
195
196 lines.push(
197 (total_time, format!(
198 "{name}: tot({total_time:.2?}), \
199 poll({poll_time:.2?}, n={total_polls}, max={max_poll_time:.2?}, stolen={perc_stolen:.1}%), \
200 update({update_time:.2?}, n={total_updates}, max={max_update_time:.2?}), \
201 recv(row={rows_received}, morsel={morsels_received}, max={max_received}), \
202 sent(row={rows_sent}, morsel={morsels_sent}, max={max_sent}), \
203 io(\
204 total_active_time={io_total_active_time:.2?}, \
205 total_bytes_requested={io_total_bytes_requested}, \
206 total_bytes_received={io_total_bytes_received})"))
207 );
208
209 total_query_ns += total_ns;
210 }
211 lines.sort_by_key(|(tot, _)| Reverse(*tot));
212
213 let total_query_time = Duration::from_nanos(total_query_ns);
214 eprintln!(
215 "Streaming query took {query_elapsed:.2?} ({total_query_time:.2?} CPU), detailed breakdown:"
216 );
217 for (_tot, line) in lines {
218 eprintln!("{line}");
219 }
220 eprintln!();
221 }
222
223 match top_ir {
224 IR::SinkMultiple { inputs } => {
225 let phys_node = &phys_sm[root_phys_node];
226 let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
227 unreachable!();
228 };
229
230 Ok(QueryResult::Multiple(
231 sinks
232 .iter()
233 .map(|phys_node_key| {
234 results
235 .remove(phys_to_graph[*phys_node_key])
236 .unwrap_or_else(DataFrame::empty)
237 })
238 .collect(),
239 ))
240 },
241 _ => Ok(QueryResult::Single(
242 results
243 .remove(phys_to_graph[root_phys_node])
244 .unwrap_or_else(DataFrame::empty),
245 )),
246 }
247 }
248}
249
250pub enum QueryResult {
251 Single(DataFrame),
252 Multiple(Vec<DataFrame>),
254}
255
256impl QueryResult {
257 pub fn unwrap_single(self) -> DataFrame {
258 use QueryResult::*;
259 match self {
260 Single(df) => df,
261 Multiple(_) => panic!(),
262 }
263 }
264
265 pub fn unwrap_multiple(self) -> Vec<DataFrame> {
266 use QueryResult::*;
267 match self {
268 Single(_) => panic!(),
269 Multiple(dfs) => dfs,
270 }
271 }
272}