spacetimedb/worker_metrics/
mod.rs1use crate::hash::Hash;
2use once_cell::sync::Lazy;
3use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
4use spacetimedb_datastore::execution_context::WorkloadType;
5use spacetimedb_lib::{ConnectionId, Identity};
6use spacetimedb_metrics::metrics_group;
7use spacetimedb_sats::memory_usage::MemoryUsage;
8use spacetimedb_table::page_pool::PagePool;
9use std::{sync::Once, time::Duration};
10use tokio::{spawn, time::sleep};
11
12metrics_group!(
13 pub struct WorkerMetrics {
14 #[name = spacetime_worker_connected_clients]
15 #[help = "Number of clients connected to the worker."]
16 #[labels(database_identity: Identity)]
17 pub connected_clients: IntGaugeVec,
18
19 #[name = spacetime_worker_ws_clients_spawned]
20 #[help = "Number of new ws client connections spawned. Counted after any on_connect reducers are run."]
21 #[labels(database_identity: Identity)]
22 pub ws_clients_spawned: IntGaugeVec,
23
24 #[name = spacetime_worker_ws_clients_aborted]
25 #[help = "Number of ws client connections aborted by either the server or the client"]
26 #[labels(database_identity: Identity)]
27 pub ws_clients_aborted: IntGaugeVec,
28
29 #[name = spacetime_worker_ws_clients_closed_connection]
30 #[help = "Number of ws client connections closed by the client as opposed to being termiated by the server"]
31 #[labels(database_identity: Identity)]
32 pub ws_clients_closed_connection: IntGaugeVec,
33
34 #[name = spacetime_websocket_requests_total]
35 #[help = "The cumulative number of websocket request messages"]
36 #[labels(database_identity: Identity, protocol: str)]
37 pub websocket_requests: IntCounterVec,
38
39 #[name = spacetime_websocket_request_msg_size]
40 #[help = "The size of messages received on connected sessions"]
41 #[labels(database_identity: Identity, protocol: str)]
42 pub websocket_request_msg_size: HistogramVec,
43
44 #[name = jemalloc_active_bytes]
45 #[help = "Number of bytes in jemallocs heap"]
46 #[labels(node_id: str)]
47 pub jemalloc_active_bytes: IntGaugeVec,
48
49 #[name = jemalloc_allocated_bytes]
50 #[help = "Number of bytes in use by the application"]
51 #[labels(node_id: str)]
52 pub jemalloc_allocated_bytes: IntGaugeVec,
53
54 #[name = jemalloc_resident_bytes]
55 #[help = "Total memory used by jemalloc"]
56 #[labels(node_id: str)]
57 pub jemalloc_resident_bytes: IntGaugeVec,
58
59 #[name = page_pool_resident_bytes]
60 #[help = "Total memory used by the page pool"]
61 #[labels(node_id: str)]
62 pub page_pool_resident_bytes: IntGaugeVec,
63
64 #[name = page_pool_dropped_pages]
65 #[help = "Total number of pages dropped by the page pool"]
66 #[labels(node_id: str)]
67 pub page_pool_dropped_pages: IntGaugeVec,
68
69 #[name = page_pool_new_pages_allocated]
70 #[help = "Total number of fresh pages allocated by the page pool"]
71 #[labels(node_id: str)]
72 pub page_pool_new_pages_allocated: IntGaugeVec,
73
74 #[name = page_pool_pages_reused]
75 #[help = "Total number of pages reused by the page pool"]
76 #[labels(node_id: str)]
77 pub page_pool_pages_reused: IntGaugeVec,
78
79 #[name = page_pool_pages_returned]
80 #[help = "Total number of pages returned to the page pool"]
81 #[labels(node_id: str)]
82 pub page_pool_pages_returned: IntGaugeVec,
83
84 #[name = tokio_num_workers]
85 #[help = "Number of core tokio workers"]
86 #[labels(node_id: str)]
87 pub tokio_num_workers: IntGaugeVec,
88
89 #[name = tokio_num_blocking_threads]
90 #[help = "Number of extra tokio threads for blocking tasks"]
91 #[labels(node_id: str)]
92 pub tokio_num_blocking_threads: IntGaugeVec,
93
94 #[name = tokio_num_idle_blocking_threads]
95 #[help = "Number of tokio blocking threads that are idle"]
96 #[labels(node_id: str)]
97 pub tokio_num_idle_blocking_threads: IntGaugeVec,
98
99 #[name = tokio_num_alive_tasks]
100 #[help = "Number of tokio tasks that are still alive"]
101 #[labels(node_id: str)]
102 pub tokio_num_alive_tasks: IntGaugeVec,
103
104 #[name = tokio_global_queue_depth]
105 #[help = "Number of tasks in tokios global queue"]
106 #[labels(node_id: str)]
107 pub tokio_global_queue_depth: IntGaugeVec,
108
109 #[name = tokio_blocking_queue_depth]
110 #[help = "Number of tasks in tokios blocking task queue"]
111 #[labels(node_id: str)]
112 pub tokio_blocking_queue_depth: IntGaugeVec,
113
114 #[name = tokio_spawned_tasks_count]
115 #[help = "Number of tokio tasks spawned"]
116 #[labels(node_id: str)]
117 pub tokio_spawned_tasks_count: IntCounterVec,
118
119 #[name = tokio_remote_schedule_count]
120 #[help = "Number of tasks spawned from outside the tokio runtime"]
121 #[labels(node_id: str)]
122 pub tokio_remote_schedule_count: IntCounterVec,
123
124 #[name = tokio_local_queue_depth_total]
125 #[help = "Total size of all tokio workers local queues"]
126 #[labels(node_id: str)]
127 pub tokio_local_queue_depth_total: IntGaugeVec,
128
129 #[name = tokio_local_queue_depth_max]
130 #[help = "Length of the longest tokio worker local queue"]
131 #[labels(node_id: str)]
132 pub tokio_local_queue_depth_max: IntGaugeVec,
133
134 #[name = tokio_local_queue_depth_min]
135 #[help = "Length of the shortest tokio worker local queue"]
136 #[labels(node_id: str)]
137 pub tokio_local_queue_depth_min: IntGaugeVec,
138
139 #[name = tokio_steal_total]
140 #[help = "Total number of tasks stolen from other workers"]
141 #[labels(node_id: str)]
142 pub tokio_steal_total: IntCounterVec,
143
144 #[name = tokio_steal_operations_total]
145 #[help = "Total number of times a worker tried to steal a chunk of tasks"]
146 #[labels(node_id: str)]
147 pub tokio_steal_operations_total: IntCounterVec,
148
149 #[name = tokio_local_schedule_total]
150 #[help = "Total number of tasks scheduled from worker threads"]
151 #[labels(node_id: str)]
152 pub tokio_local_schedule_total: IntCounterVec,
153
154 #[name = tokio_overflow_total]
155 #[help = "Total number of times a tokio worker overflowed its local queue"]
156 #[labels(node_id: str)]
157 pub tokio_overflow_total: IntCounterVec,
158
159 #[name = tokio_busy_ratio_min]
160 #[help = "Busy ratio of the least busy tokio worker"]
161 #[labels(node_id: str)]
162 pub tokio_busy_ratio_min: GaugeVec,
163
164 #[name = tokio_busy_ratio_max]
165 #[help = "Busy ratio of the most busy tokio worker"]
166 #[labels(node_id: str)]
167 pub tokio_busy_ratio_max: GaugeVec,
168
169 #[name = tokio_busy_ratio_avg]
170 #[help = "Avg busy ratio of tokio workers"]
171 #[labels(node_id: str)]
172 pub tokio_busy_ratio_avg: GaugeVec,
173
174 #[name = tokio_mean_polls_per_park]
175 #[help = "Number of tasks polls divided by the times an idle worker was parked"]
176 #[labels(node_id: str)]
177 pub tokio_mean_polls_per_park: GaugeVec,
178
179 #[name = spacetime_websocket_sent_msg_size_bytes]
180 #[help = "The size of messages sent to connected sessions"]
181 #[labels(db: Identity, workload: WorkloadType)]
182 #[buckets(100, 500, 1e3, 10e3, 100e3, 500e3, 1e6, 5e6, 10e6, 25e6, 50e6, 75e6, 100e6, 500e6)]
192 pub websocket_sent_msg_size: HistogramVec,
193
194 #[name = spacetime_websocket_sent_num_rows]
195 #[help = "The number of rows sent to connected sessions"]
196 #[labels(db: Identity, workload: WorkloadType)]
197 #[buckets(5, 10, 50, 100, 500, 1e3, 5e3, 10e3, 50e3, 100e3, 250e3, 500e3, 750e3, 1e6, 5e6)]
207 pub websocket_sent_num_rows: HistogramVec,
208
209 #[name = spacetime_websocket_serialize_secs]
210 #[help = "How long it took to serialize and maybe compress an outgoing websocket message"]
211 #[labels(db: Identity)]
212 #[buckets(0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0)]
213 pub websocket_serialize_secs: HistogramVec,
214
215 #[name = spacetime_worker_instance_operation_queue_length]
216 #[help = "Length of the wait queue for access to a module instance."]
217 #[labels(database_identity: Identity)]
218 pub instance_queue_length: IntGaugeVec,
219
220 #[name = spacetime_worker_instance_operation_queue_length_histogram]
221 #[help = "Length of the wait queue for access to a module instance."]
222 #[labels(database_identity: Identity)]
223 #[buckets(0, 1, 2, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 1000)]
228 pub instance_queue_length_histogram: HistogramVec,
229
230 #[name = spacetime_reducer_wait_time_sec]
231 #[help = "The amount of time (in seconds) a reducer spends in the queue waiting to run"]
232 #[labels(db: Identity, reducer: str)]
233 #[buckets(100e-6, 500e-6, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10)]
241 pub reducer_wait_time: HistogramVec,
242
243 #[name = spacetime_worker_wasm_instance_errors_total]
244 #[help = "The number of fatal WASM instance errors, such as reducer panics."]
245 #[labels(caller_identity: Identity, module_hash: Hash, caller_connection_id: ConnectionId, reducer_symbol: str)]
246 pub wasm_instance_errors: IntCounterVec,
247
248 #[name = spacetime_worker_wasm_memory_bytes]
249 #[help = "The number of bytes of linear memory allocated by the database's WASM module instance"]
250 #[labels(database_identity: Identity)]
251 pub wasm_memory_bytes: IntGaugeVec,
252
253 #[name = spacetime_active_queries]
254 #[help = "The number of active subscription queries"]
255 #[labels(database_identity: Identity)]
256 pub subscription_queries: IntGaugeVec,
257
258 #[name = spacetime_request_round_trip_time]
259 #[help = "The total time it takes for request to complete"]
260 #[labels(txn_type: WorkloadType, database_identity: Identity, reducer_symbol: str)]
261 pub request_round_trip: HistogramVec,
262
263 #[name = spacetime_reducer_plus_query_duration_sec]
264 #[help = "The time spent executing a reducer (in seconds), plus the time spent evaluating its subscription queries"]
265 #[labels(db: Identity, reducer: str)]
266 pub reducer_plus_query_duration: HistogramVec,
267
268 #[name = spacetime_num_bytes_sent_to_clients_total]
269 #[help = "The cumulative number of bytes sent to clients"]
270 #[labels(txn_type: WorkloadType, db: Identity)]
271 pub bytes_sent_to_clients: IntCounterVec,
272
273 #[name = spacetime_subscription_send_queue_length]
274 #[help = "The number of `ComputedQueries` waiting in the queue to be aggregated and broadcast by the `send_worker`"]
275 #[labels(database_identity: Identity)]
276 pub subscription_send_queue_length: IntGaugeVec,
277
278 #[name = spacetime_total_incoming_queue_length]
279 #[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"]
280 #[labels(db: Identity)]
281 pub total_incoming_queue_length: IntGaugeVec,
282
283 #[name = spacetime_total_outgoing_queue_length]
284 #[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
285 #[labels(db: Identity)]
286 pub total_outgoing_queue_length: IntGaugeVec,
287 }
288);
289
290pub static WORKER_METRICS: Lazy<WorkerMetrics> = Lazy::new(WorkerMetrics::new);
291
292#[cfg(not(target_env = "msvc"))]
293use tikv_jemalloc_ctl::{epoch, stats};
294
295#[cfg(not(target_env = "msvc"))]
296static SPAWN_JEMALLOC_GUARD: Once = Once::new();
297pub fn spawn_jemalloc_stats(_node_id: String) {
298 #[cfg(not(target_env = "msvc"))]
299 SPAWN_JEMALLOC_GUARD.call_once(|| {
300 spawn(async move {
301 let allocated_bytes = WORKER_METRICS.jemalloc_allocated_bytes.with_label_values(&_node_id);
302 let resident_bytes = WORKER_METRICS.jemalloc_resident_bytes.with_label_values(&_node_id);
303 let active_bytes = WORKER_METRICS.jemalloc_active_bytes.with_label_values(&_node_id);
304
305 let e = epoch::mib().unwrap();
306 loop {
307 e.advance().unwrap();
308
309 let allocated = stats::allocated::read().unwrap();
310 let resident = stats::resident::read().unwrap();
311 let active = stats::active::read().unwrap();
312
313 allocated_bytes.set(allocated as i64);
314 resident_bytes.set(resident as i64);
315 active_bytes.set(active as i64);
316
317 sleep(Duration::from_secs(10)).await;
318 }
319 });
320 });
321}
322
323static SPAWN_PAGE_POOL_GUARD: Once = Once::new();
324pub fn spawn_page_pool_stats(node_id: String, page_pool: PagePool) {
325 SPAWN_PAGE_POOL_GUARD.call_once(|| {
326 spawn(async move {
327 let resident_bytes = WORKER_METRICS.page_pool_resident_bytes.with_label_values(&node_id);
328 let dropped_pages = WORKER_METRICS.page_pool_dropped_pages.with_label_values(&node_id);
329 let new_pages = WORKER_METRICS.page_pool_new_pages_allocated.with_label_values(&node_id);
330 let reused_pages = WORKER_METRICS.page_pool_pages_reused.with_label_values(&node_id);
331 let returned_pages = WORKER_METRICS.page_pool_pages_returned.with_label_values(&node_id);
332
333 loop {
334 resident_bytes.set(page_pool.heap_usage() as i64);
335 dropped_pages.set(page_pool.dropped_count() as i64);
336 new_pages.set(page_pool.new_allocated_count() as i64);
337 reused_pages.set(page_pool.reused_count() as i64);
338 returned_pages.set(page_pool.reused_count() as i64);
339
340 sleep(Duration::from_secs(10)).await;
341 }
342 });
343 });
344}
345
346#[cfg(all(target_has_atomic = "64", tokio_unstable))]
348const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10);
349#[cfg(all(target_has_atomic = "64", tokio_unstable))]
350static SPAWN_TOKIO_STATS_GUARD: Once = Once::new();
351pub fn spawn_tokio_stats(node_id: String) {
352 #[cfg(not(all(target_has_atomic = "64", tokio_unstable)))]
356 log::warn!("Skipping tokio metrics for {node_id}, as they are not enabled in this build.");
357
358 #[cfg(all(target_has_atomic = "64", tokio_unstable))]
359 SPAWN_TOKIO_STATS_GUARD.call_once(|| {
360 spawn(async move {
361 let num_worker_metric = WORKER_METRICS.tokio_num_workers.with_label_values(&node_id);
363 let num_blocking_threads_metric = WORKER_METRICS.tokio_num_blocking_threads.with_label_values(&node_id);
364 let num_alive_tasks_metric = WORKER_METRICS.tokio_num_alive_tasks.with_label_values(&node_id);
365 let global_queue_depth_metric = WORKER_METRICS.tokio_global_queue_depth.with_label_values(&node_id);
366 let num_idle_blocking_threads_metric = WORKER_METRICS
367 .tokio_num_idle_blocking_threads
368 .with_label_values(&node_id);
369 let blocking_queue_depth_metric = WORKER_METRICS.tokio_blocking_queue_depth.with_label_values(&node_id);
370 let spawned_tasks_count_metric = WORKER_METRICS.tokio_spawned_tasks_count.with_label_values(&node_id);
371 let remote_schedule_count_metric = WORKER_METRICS.tokio_remote_schedule_count.with_label_values(&node_id);
372
373 let local_queue_depth_total_metric =
374 WORKER_METRICS.tokio_local_queue_depth_total.with_label_values(&node_id);
375 let local_queue_depth_max_metric = WORKER_METRICS.tokio_local_queue_depth_max.with_label_values(&node_id);
376 let local_queue_depth_min_metric = WORKER_METRICS.tokio_local_queue_depth_min.with_label_values(&node_id);
377 let steal_total_metric = WORKER_METRICS.tokio_steal_total.with_label_values(&node_id);
378 let steal_operations_total_metric = WORKER_METRICS.tokio_steal_operations_total.with_label_values(&node_id);
379 let local_schedule_total_metric = WORKER_METRICS.tokio_local_schedule_total.with_label_values(&node_id);
380 let overflow_total_metric = WORKER_METRICS.tokio_overflow_total.with_label_values(&node_id);
381 let busy_ratio_min_metric = WORKER_METRICS.tokio_busy_ratio_min.with_label_values(&node_id);
382 let busy_ratio_max_metric = WORKER_METRICS.tokio_busy_ratio_max.with_label_values(&node_id);
383 let busy_ratio_avg_metric = WORKER_METRICS.tokio_busy_ratio_avg.with_label_values(&node_id);
384 let mean_polls_per_park_metric = WORKER_METRICS.tokio_mean_polls_per_park.with_label_values(&node_id);
385
386 let handle = tokio::runtime::Handle::current();
387 let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
389 let mut intervals = runtime_monitor.intervals();
390 loop {
391 let metrics = tokio::runtime::Handle::current().metrics();
392 let interval_delta = intervals.next();
393
394 num_worker_metric.set(metrics.num_workers() as i64);
395 num_alive_tasks_metric.set(metrics.num_alive_tasks() as i64);
396 global_queue_depth_metric.set(metrics.global_queue_depth() as i64);
397 num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
398 num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
399 blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);
400
401 {
404 let current_count = metrics.spawned_tasks_count();
405 let previous_value = spawned_tasks_count_metric.get();
406 if let Some(diff) = current_count.checked_sub(previous_value) {
408 spawned_tasks_count_metric.inc_by(diff);
409 }
410 }
411 {
412 let current_count = metrics.remote_schedule_count();
413 let previous_value = remote_schedule_count_metric.get();
414 if let Some(diff) = current_count.checked_sub(previous_value) {
416 remote_schedule_count_metric.inc_by(diff);
417 }
418 }
419
420 if let Some(interval_delta) = interval_delta {
421 local_queue_depth_total_metric.set(interval_delta.total_local_queue_depth as i64);
422 local_queue_depth_max_metric.set(interval_delta.max_local_queue_depth as i64);
423 local_queue_depth_min_metric.set(interval_delta.min_local_queue_depth as i64);
424 steal_total_metric.inc_by(interval_delta.total_steal_count);
425 steal_operations_total_metric.inc_by(interval_delta.total_steal_operations);
426 local_schedule_total_metric.inc_by(interval_delta.total_local_schedule_count);
427 overflow_total_metric.inc_by(interval_delta.total_overflow_count);
428 mean_polls_per_park_metric.set(interval_delta.mean_polls_per_park());
429
430 if interval_delta.elapsed.as_millis() > 100 {
432 busy_ratio_avg_metric.set(interval_delta.busy_ratio());
433 busy_ratio_min_metric.set(
434 interval_delta.min_busy_duration.as_nanos() as f64
435 / interval_delta.elapsed.as_nanos() as f64,
436 );
437 busy_ratio_max_metric.set(
438 interval_delta.max_busy_duration.as_nanos() as f64
439 / interval_delta.elapsed.as_nanos() as f64,
440 );
441 }
442 }
443 sleep(TOKIO_STATS_INTERVAL).await;
444 }
445 });
446 });
447}