polars_stream/
skeleton.rs1#![allow(unused)] use std::cmp::Reverse;
3
4use polars_core::POOL;
5use polars_core::prelude::*;
6use polars_expr::planner::{ExpressionConversionState, create_physical_expr, get_expr_depth_limit};
7use polars_plan::plans::{Context, IR, IRPlan};
8use polars_plan::prelude::AExpr;
9use polars_plan::prelude::expr_ir::ExprIR;
10use polars_utils::arena::{Arena, Node};
11use slotmap::{SecondaryMap, SlotMap};
12
13use crate::graph::{Graph, GraphNodeKey};
14use crate::physical_plan::{PhysNode, PhysNodeKey, PhysNodeKind, StreamingLowerIRContext};
15
16pub fn run_query(
28 node: Node,
29 ir_arena: &mut Arena<IR>,
30 expr_arena: &mut Arena<AExpr>,
31) -> PolarsResult<QueryResult> {
32 StreamingQuery::build(node, ir_arena, expr_arena)?.execute()
33}
34
35pub fn visualize_physical_plan(
37 node: Node,
38 ir_arena: &mut Arena<IR>,
39 expr_arena: &mut Arena<AExpr>,
40) -> PolarsResult<String> {
41 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
42
43 let ctx = StreamingLowerIRContext {
44 prepare_visualization: true,
45 };
46 let root_phys_node =
47 crate::physical_plan::build_physical_plan(node, ir_arena, expr_arena, &mut phys_sm, ctx)?;
48
49 let out = crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
50
51 Ok(out)
52}
53
54pub struct StreamingQuery {
55 top_ir: IR,
56 graph: Graph,
57 root_phys_node: PhysNodeKey,
58 phys_sm: SlotMap<PhysNodeKey, PhysNode>,
59 phys_to_graph: SecondaryMap<PhysNodeKey, GraphNodeKey>,
60}
61
62impl StreamingQuery {
63 pub fn build(
64 node: Node,
65 ir_arena: &mut Arena<IR>,
66 expr_arena: &mut Arena<AExpr>,
67 ) -> PolarsResult<Self> {
68 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_IR") {
69 let plan = IRPlan {
70 lp_top: node,
71 lp_arena: ir_arena.clone(),
72 expr_arena: expr_arena.clone(),
73 };
74 let visualization = plan.display_dot().to_string();
75 std::fs::write(visual_path, visualization).unwrap();
76 }
77 let mut phys_sm = SlotMap::with_capacity_and_key(ir_arena.len());
78 let ctx = StreamingLowerIRContext {
79 prepare_visualization: false,
80 };
81 let root_phys_node = crate::physical_plan::build_physical_plan(
82 node,
83 ir_arena,
84 expr_arena,
85 &mut phys_sm,
86 ctx,
87 )?;
88 if let Ok(visual_path) = std::env::var("POLARS_VISUALIZE_PHYSICAL_PLAN") {
89 let visualization =
90 crate::physical_plan::visualize_plan(root_phys_node, &phys_sm, expr_arena);
91 std::fs::write(visual_path, visualization).unwrap();
92 }
93
94 let (mut graph, phys_to_graph) =
95 crate::physical_plan::physical_plan_to_graph(root_phys_node, &phys_sm, expr_arena)?;
96
97 let top_ir = ir_arena.get(node).clone();
98
99 let out = StreamingQuery {
100 top_ir,
101 graph,
102 root_phys_node,
103 phys_sm,
104 phys_to_graph,
105 };
106
107 Ok(out)
108 }
109
110 pub fn execute(self) -> PolarsResult<QueryResult> {
111 let StreamingQuery {
112 top_ir,
113 mut graph,
114 root_phys_node,
115 phys_sm,
116 phys_to_graph,
117 } = self;
118
119 crate::async_executor::clear_task_wait_statistics();
120 let mut results = crate::execute::execute_graph(&mut graph)?;
121
122 if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
123 let mut stats = crate::async_executor::get_task_wait_statistics();
124 stats.sort_by_key(|(_l, w)| Reverse(*w));
125 eprintln!("Time spent waiting for async tasks:");
126 for (loc, wait_time) in stats {
127 eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time);
128 }
129 }
130
131 match top_ir {
132 IR::SinkMultiple { inputs } => {
133 let phys_node = &phys_sm[root_phys_node];
134 let PhysNodeKind::SinkMultiple { sinks } = phys_node.kind() else {
135 unreachable!();
136 };
137
138 Ok(QueryResult::Multiple(
139 sinks
140 .iter()
141 .map(|phys_node_key| {
142 results
143 .remove(phys_to_graph[*phys_node_key])
144 .unwrap_or_else(DataFrame::empty)
145 })
146 .collect(),
147 ))
148 },
149 _ => Ok(QueryResult::Single(
150 results
151 .remove(phys_to_graph[root_phys_node])
152 .unwrap_or_else(DataFrame::empty),
153 )),
154 }
155 }
156}
157
158pub enum QueryResult {
159 Single(DataFrame),
160 Multiple(Vec<DataFrame>),
162}
163
164impl QueryResult {
165 pub fn unwrap_single(self) -> DataFrame {
166 use QueryResult::*;
167 match self {
168 Single(df) => df,
169 Multiple(_) => panic!(),
170 }
171 }
172
173 pub fn unwrap_multiple(self) -> Vec<DataFrame> {
174 use QueryResult::*;
175 match self {
176 Single(_) => panic!(),
177 Multiple(dfs) => dfs,
178 }
179 }
180}