1use std::sync::Arc;
2use std::time::Duration;
3
4pub use polars_io::metrics::{IOMetrics, OptIOMetrics};
5use slotmap::{SecondaryMap, SlotMap};
6
7use crate::LogicalPipe;
8use crate::async_executor::TaskMetrics;
9use crate::graph::{GraphNodeKey, LogicalPipeKey};
10use crate::pipe::PipeMetrics;
11
12#[derive(Default, Clone)]
13pub struct NodeMetrics {
14 pub total_polls: u64,
15 pub total_stolen_polls: u64,
16 pub total_poll_time_ns: u64,
17 pub max_poll_time_ns: u64,
18
19 pub total_state_updates: u64,
20 pub total_state_update_time_ns: u64,
21 pub max_state_update_time_ns: u64,
22
23 pub morsels_sent: u64,
24 pub rows_sent: u64,
25 pub largest_morsel_sent: u64,
26 pub morsels_received: u64,
27 pub rows_received: u64,
28 pub largest_morsel_received: u64,
29
30 pub io_total_active_ns: u64,
31 pub io_total_bytes_requested: u64,
32 pub io_total_bytes_received: u64,
33
34 pub state_update_in_progress: bool,
35 pub num_running_tasks: u32,
36 pub done: bool,
37}
38
39impl NodeMetrics {
40 fn add_task(&mut self, task_metrics: &TaskMetrics) {
41 self.total_polls += task_metrics.total_polls.load();
42 self.total_stolen_polls += task_metrics.total_stolen_polls.load();
43 self.total_poll_time_ns += task_metrics.total_poll_time_ns.load();
44 self.max_poll_time_ns = self
45 .max_poll_time_ns
46 .max(task_metrics.max_poll_time_ns.load());
47 self.num_running_tasks += (!task_metrics.done.load()) as u32;
48 }
49
50 fn add_io_recv(&mut self, io_metrics: &IOMetrics) {
51 self.io_total_active_ns += io_metrics.io_timer.total_time_live_ns();
52 self.io_total_bytes_requested += io_metrics.bytes_requested.load();
53 self.io_total_bytes_received += io_metrics.bytes_received.load();
54 }
55
56 fn start_state_update(&mut self) {
57 self.state_update_in_progress = true;
58 }
59
60 fn stop_state_update(&mut self, time: Duration, is_done: bool) {
61 let time_ns = time.as_nanos() as u64;
62 self.total_state_updates += 1;
63 self.total_state_update_time_ns += time_ns;
64 self.max_state_update_time_ns = self.max_state_update_time_ns.max(time_ns);
65 self.state_update_in_progress = false;
66 self.done = is_done;
67 }
68
69 fn add_send_metrics(&mut self, pipe_metrics: &PipeMetrics) {
70 self.morsels_sent += pipe_metrics.morsels_sent.load();
71 self.rows_sent += pipe_metrics.rows_sent.load();
72 self.largest_morsel_sent = self
73 .largest_morsel_sent
74 .max(pipe_metrics.largest_morsel_sent.load());
75 }
76
77 fn add_recv_metrics(&mut self, pipe_metrics: &PipeMetrics) {
78 self.morsels_received += pipe_metrics.morsels_received.load();
79 self.rows_received += pipe_metrics.rows_received.load();
80 self.largest_morsel_received = self
81 .largest_morsel_received
82 .max(pipe_metrics.largest_morsel_received.load());
83 }
84}
85
86#[derive(Default, Clone)]
87pub struct GraphMetrics {
88 node_metrics: SecondaryMap<GraphNodeKey, NodeMetrics>,
89 in_progress_io_metrics: SecondaryMap<GraphNodeKey, Vec<Arc<IOMetrics>>>,
90 in_progress_task_metrics: SecondaryMap<GraphNodeKey, Vec<Arc<TaskMetrics>>>,
91 in_progress_pipe_metrics: SecondaryMap<LogicalPipeKey, Vec<Arc<PipeMetrics>>>,
92}
93
94impl GraphMetrics {
95 pub fn add_task(&mut self, key: GraphNodeKey, task_metrics: Arc<TaskMetrics>) {
96 self.in_progress_task_metrics
97 .entry(key)
98 .unwrap()
99 .or_default()
100 .push(task_metrics);
101 }
102
103 pub fn add_pipe(&mut self, key: LogicalPipeKey, pipe_metrics: Arc<PipeMetrics>) {
104 self.in_progress_pipe_metrics
105 .entry(key)
106 .unwrap()
107 .or_default()
108 .push(pipe_metrics);
109 }
110
111 pub fn start_state_update(&mut self, key: GraphNodeKey) {
112 self.node_metrics
113 .entry(key)
114 .unwrap()
115 .or_default()
116 .start_state_update();
117 }
118
119 pub fn stop_state_update(&mut self, key: GraphNodeKey, time: Duration, is_done: bool) {
120 self.node_metrics[key].stop_state_update(time, is_done);
121 }
122
123 pub fn flush(&mut self, pipes: &SlotMap<LogicalPipeKey, LogicalPipe>) {
124 for (key, in_progress_task_metrics) in self.in_progress_task_metrics.iter_mut() {
125 let this_node_metrics = self.node_metrics.entry(key).unwrap().or_default();
126 this_node_metrics.num_running_tasks = 0;
127 for task_metrics in in_progress_task_metrics.drain(..) {
128 this_node_metrics.add_task(&task_metrics);
129 }
130 }
131
132 for (key, in_progress_io_metrics) in self.in_progress_io_metrics.iter_mut() {
133 let this_node_metrics = self.node_metrics.entry(key).unwrap().or_default();
134 this_node_metrics.num_running_tasks = 0;
135 for io_metrics in in_progress_io_metrics.drain(..) {
136 this_node_metrics.add_io_recv(&io_metrics);
137 }
138 }
139
140 for (key, in_progress_pipe_metrics) in self.in_progress_pipe_metrics.iter_mut() {
141 for pipe_metrics in in_progress_pipe_metrics.drain(..) {
142 let pipe = &pipes[key];
143 self.node_metrics
144 .entry(pipe.receiver)
145 .unwrap()
146 .or_default()
147 .add_recv_metrics(&pipe_metrics);
148 self.node_metrics
149 .entry(pipe.sender)
150 .unwrap()
151 .or_default()
152 .add_send_metrics(&pipe_metrics);
153 }
154 }
155 }
156
157 pub fn get(&self, key: GraphNodeKey) -> Option<&NodeMetrics> {
158 self.node_metrics.get(key)
159 }
160
161 pub fn iter(&self) -> slotmap::secondary::Iter<'_, GraphNodeKey, NodeMetrics> {
162 self.node_metrics.iter()
163 }
164}
165
166pub struct MetricsBuilder {
167 pub graph_key: GraphNodeKey,
168 pub graph_metrics: Arc<parking_lot::Mutex<GraphMetrics>>,
169}
170
171impl MetricsBuilder {
172 pub fn new_io_metrics(&self) -> Arc<IOMetrics> {
173 let io_metrics: Arc<IOMetrics> = Default::default();
174
175 self.graph_metrics
176 .lock()
177 .in_progress_io_metrics
178 .entry(self.graph_key)
179 .unwrap()
180 .or_default()
181 .push(Arc::clone(&io_metrics));
182
183 io_metrics
184 }
185}