1#![allow(unused)] use std::cmp::Reverse;
3use std::time::{Duration, Instant};
4
5use parking_lot::Mutex;
6use polars_core::POOL;
7use polars_core::prelude::*;
8use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
9use polars_plan::plans::{Context, IR, IRPlan};
10use polars_plan::prelude::AExpr;
11use polars_plan::prelude::expr_ir::ExprIR;
12use polars_utils::arena::{Arena, Node};
13use polars_utils::relaxed_cell::RelaxedCell;
14use slotmap::{SecondaryMap, SlotMap};
15
16use crate::graph::{Graph, GraphNodeKey};
17use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
18
19pub fn run_query(
31 node: Node,
32 ir_arena: &mut Arena<IR>,
33 expr_arena: &mut Arena<AExpr>,
34) -> PolarsResult<QueryResult> {
35 StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
36}
37
38pub fn visualize_physical_plan(
40 node: Node,
41 ir_arena: &mut Arena<IR>,
42 expr_arena: &mut Arena<AExpr>,
43) -> PolarsResult<String> {
44 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
45
46 let ctx = StreamingLowerIRContext {
47 prepare_visualization: true,
48 };
49 let root_phys_node =
50 crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
51
52 let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
53
54 Ok(out)
55}
56
57pub struct StreamingQuery {
58 top_ir: IR,
59 graph: Graph,
60 root_phys_node: PhysNodeKey,
61 phys_sm: SlotMap<PhysNodeKey, PhysNode>,
62 phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
63}
64
65pub static PREPARE_VISUALIZATION_DATA: RelaxedCell<bool> = RelaxedCell::new_bool(false);
67
68pub fn always_prepare_visualization_data() {
70 PREPARE_VISUALIZATION_DATA.store(true);
71}
72
73fn cfg_prepare_visualization_data() -> bool {
74 if !PREPARE_VISUALIZATION_DATA.load() {
75 PREPARE_VISUALIZATION_DATA.fetch_or(
76 std::env::var("POLARS_STREAM_ALWAYS_PREPARE_VISUALIZATION_DATA").as_deref() == Ok("1"),
77 );
78 }
79
80 PREPARE_VISUALIZATION_DATA.load()
81}
82
83impl StreamingQuery {
84 pub fn build(
85 node: Node,
86 ir_arena: &mut Arena<IR>,
87 expr_arena: &mut Arena<AExpr>,
88 ) -> PolarsResult<Self> {
89 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
90 let plan = IRPlan {
91 lp_top: node,
92 lp_arena: ir_arena.clone(),
93 expr_arena: expr_arena.clone(),
94 };
95 let visualization = plan.display_dot().to_string();
96 std::fs::write(visual_path, visualization).unwrap();
97 }
98 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
99 let ctx = StreamingLowerIRContext {
100 prepare_visualization: cfg_prepare_visualization_data(),
101 };
102 let root_phys_node = crate::physical_plan::build_physical_plan(
103 node,
104 ir_arena,
105 expr_arena,
106 &mut phys_sm,
107 ctx,
108 )?;
109 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
110 let visualization =
111 crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
112 std::fs::write(visual_path, visualization).unwrap();
113 }
114
115 let (mut graph, phys_to_graph) =
116 crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
117
118 let top_ir = ir_arena.get(node).clone();
119
120 let out = StreamingQuery {
121 top_ir,
122 graph,
123 root_phys_node,
124 phys_sm,
125 phys_to_graph,
126 };
127
128 Ok(out)
129 }
130
131 pub fn execute(self) -> PolarsResult<QueryResult> {
132 let StreamingQuery {
133 top_ir,
134 mut graph,
135 root_phys_node,
136 phys_sm,
137 phys_to_graph,
138 } = self;
139
140 let metrics = if std::env::var("POLARS_TRACK_METRICS").as_deref() == Ok("1") {
141 crate::async_executor::track_task_metrics(true);
142 Some(Arc::default())
143 } else {
144 None
145 };
146
147 let query_start = Instant::now();
148 let mut results = crate::execute::execute_graph(&mut graph, metrics.clone())?;
149 let query_elapsed = query_start.elapsed();
150
151 if let Some(lock) = metrics {
153 let mut total_query_ns = 0;
154 let mut lines = Vec::new();
155 let m = lock.lock();
156 for phys_node_key in phys_sm.keys() {
157 let Some(graph_node_key) = phys_to_graph.get(phys_node_key) else {
158 continue;
159 };
160 let Some(node_metrics) = m.get(*graph_node_key) else {
161 continue;
162 };
163 let name = graph.nodes[*graph_node_key].compute.name();
164 let total_ns =
165 node_metrics.total_poll_time_ns + node_metrics.total_state_update_time_ns;
166 let total_time = Duration::from_nanos(total_ns);
167 let poll_time = Duration::from_nanos(node_metrics.total_poll_time_ns);
168 let update_time = Duration::from_nanos(node_metrics.total_state_update_time_ns);
169 let max_poll_time = Duration::from_nanos(node_metrics.max_poll_time_ns);
170 let max_update_time = Duration::from_nanos(node_metrics.max_state_update_time_ns);
171 let total_polls = node_metrics.total_polls;
172 let total_updates = node_metrics.total_state_updates;
173 let perc_stolen = node_metrics.total_stolen_polls as f64
174 / node_metrics.total_polls as f64
175 * 100.0;
176
177 let rows_received = node_metrics.rows_received;
178 let morsels_received = node_metrics.morsels_received;
179 let max_received = node_metrics.largest_morsel_received;
180 let rows_sent = node_metrics.rows_sent;
181 let morsels_sent = node_metrics.morsels_sent;
182 let max_sent = node_metrics.largest_morsel_sent;
183
184 lines.push(
185 (total_time, format!(
186 "{name}: tot({total_time:.2?}), \
187 poll({poll_time:.2?}, n={total_polls}, max={max_poll_time:.2?}, stolen={perc_stolen:.1}%), \
188 update({update_time:.2?}, n={total_updates}, max={max_update_time:.2?}), \
189 recv(row={rows_received}, morsel={morsels_received}, max={max_received}), \
190 sent(row={rows_sent}, morsel={morsels_sent}, max={max_sent})"))
191 );
192
193 total_query_ns += total_ns;
194 }
195 lines.sort_by_key(|(tot, _)| Reverse(*tot));
196
197 let total_query_time = Duration::from_nanos(total_query_ns);
198 eprintln!(
199 "Streaming query took {query_elapsed:.2?} ({total_query_time:.2?} CPU), detailed breakdown:"
200 );
201 for (_tot, line) in lines {
202 eprintln!("{line}");
203 }
204 eprintln!();
205 }
206
207 match top_ir {
208 IR::SinkMultiple { inputs } => {
209 let phys_node = &phys_sm[root_phys_node];
210 let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
211 unreachable!();
212 };
213
214 Ok(QueryResult::Multiple(
215 sinks
216 .iter()
217 .map(|phys_node_key| {
218 results
219 .remove(phys_to_graph[*phys_node_key])
220 .unwrap_or_else(DataFrame::empty)
221 })
222 .collect(),
223 ))
224 },
225 _ => Ok(QueryResult::Single(
226 results
227 .remove(phys_to_graph[root_phys_node])
228 .unwrap_or_else(DataFrame::empty),
229 )),
230 }
231 }
232}
233
234pub enum QueryResult {
235 Single(DataFrame),
236 Multiple(Vec<DataFrame>),
238}
239
240impl QueryResult {
241 pub fn unwrap_single(self) -> DataFrame {
242 use QueryResult::*;
243 match self {
244 Single(df) => df,
245 Multiple(_) => panic!(),
246 }
247 }
248
249 pub fn unwrap_multiple(self) -> Vec<DataFrame> {
250 use QueryResult::*;
251 match self {
252 Single(_) => panic!(),
253 Multiple(dfs) => dfs,
254 }
255 }
256}