spacetimedb/worker_metrics/
mod.rs

1use crate::hash::Hash;
2use once_cell::sync::Lazy;
3use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
4use spacetimedb_datastore::execution_context::WorkloadType;
5use spacetimedb_lib::{ConnectionId, Identity};
6use spacetimedb_metrics::metrics_group;
7use spacetimedb_sats::memory_usage::MemoryUsage;
8use spacetimedb_table::page_pool::PagePool;
9use std::{sync::Once, time::Duration};
10use tokio::{spawn, time::sleep};
11
12metrics_group!(
13    pub struct WorkerMetrics {
14        #[name = spacetime_worker_connected_clients]
15        #[help = "Number of clients connected to the worker."]
16        #[labels(database_identity: Identity)]
17        pub connected_clients: IntGaugeVec,
18
19        #[name = spacetime_worker_ws_clients_spawned]
20        #[help = "Number of new ws client connections spawned. Counted after any on_connect reducers are run."]
21        #[labels(database_identity: Identity)]
22        pub ws_clients_spawned: IntGaugeVec,
23
24        #[name = spacetime_worker_ws_clients_aborted]
25        #[help = "Number of ws client connections aborted by either the server or the client"]
26        #[labels(database_identity: Identity)]
27        pub ws_clients_aborted: IntGaugeVec,
28
29        #[name = spacetime_worker_ws_clients_closed_connection]
30        #[help = "Number of ws client connections closed by the client as opposed to being termiated by the server"]
31        #[labels(database_identity: Identity)]
32        pub ws_clients_closed_connection: IntGaugeVec,
33
34        #[name = spacetime_websocket_requests_total]
35        #[help = "The cumulative number of websocket request messages"]
36        #[labels(database_identity: Identity, protocol: str)]
37        pub websocket_requests: IntCounterVec,
38
39        #[name = spacetime_websocket_request_msg_size]
40        #[help = "The size of messages received on connected sessions"]
41        #[labels(database_identity: Identity, protocol: str)]
42        pub websocket_request_msg_size: HistogramVec,
43
44        #[name = jemalloc_active_bytes]
45        #[help = "Number of bytes in jemallocs heap"]
46        #[labels(node_id: str)]
47        pub jemalloc_active_bytes: IntGaugeVec,
48
49        #[name = jemalloc_allocated_bytes]
50        #[help = "Number of bytes in use by the application"]
51        #[labels(node_id: str)]
52        pub jemalloc_allocated_bytes: IntGaugeVec,
53
54        #[name = jemalloc_resident_bytes]
55        #[help = "Total memory used by jemalloc"]
56        #[labels(node_id: str)]
57        pub jemalloc_resident_bytes: IntGaugeVec,
58
59        #[name = page_pool_resident_bytes]
60        #[help = "Total memory used by the page pool"]
61        #[labels(node_id: str)]
62        pub page_pool_resident_bytes: IntGaugeVec,
63
64        #[name = page_pool_dropped_pages]
65        #[help = "Total number of pages dropped by the page pool"]
66        #[labels(node_id: str)]
67        pub page_pool_dropped_pages: IntGaugeVec,
68
69        #[name = page_pool_new_pages_allocated]
70        #[help = "Total number of fresh pages allocated by the page pool"]
71        #[labels(node_id: str)]
72        pub page_pool_new_pages_allocated: IntGaugeVec,
73
74        #[name = page_pool_pages_reused]
75        #[help = "Total number of pages reused by the page pool"]
76        #[labels(node_id: str)]
77        pub page_pool_pages_reused: IntGaugeVec,
78
79        #[name = page_pool_pages_returned]
80        #[help = "Total number of pages returned to the page pool"]
81        #[labels(node_id: str)]
82        pub page_pool_pages_returned: IntGaugeVec,
83
84        #[name = tokio_num_workers]
85        #[help = "Number of core tokio workers"]
86        #[labels(node_id: str)]
87        pub tokio_num_workers: IntGaugeVec,
88
89        #[name = tokio_num_blocking_threads]
90        #[help = "Number of extra tokio threads for blocking tasks"]
91        #[labels(node_id: str)]
92        pub tokio_num_blocking_threads: IntGaugeVec,
93
94        #[name = tokio_num_idle_blocking_threads]
95        #[help = "Number of tokio blocking threads that are idle"]
96        #[labels(node_id: str)]
97        pub tokio_num_idle_blocking_threads: IntGaugeVec,
98
99        #[name = tokio_num_alive_tasks]
100        #[help = "Number of tokio tasks that are still alive"]
101        #[labels(node_id: str)]
102        pub tokio_num_alive_tasks: IntGaugeVec,
103
104        #[name = tokio_global_queue_depth]
105        #[help = "Number of tasks in tokios global queue"]
106        #[labels(node_id: str)]
107        pub tokio_global_queue_depth: IntGaugeVec,
108
109        #[name = tokio_blocking_queue_depth]
110        #[help = "Number of tasks in tokios blocking task queue"]
111        #[labels(node_id: str)]
112        pub tokio_blocking_queue_depth: IntGaugeVec,
113
114        #[name = tokio_spawned_tasks_count]
115        #[help = "Number of tokio tasks spawned"]
116        #[labels(node_id: str)]
117        pub tokio_spawned_tasks_count: IntCounterVec,
118
119        #[name = tokio_remote_schedule_count]
120        #[help = "Number of tasks spawned from outside the tokio runtime"]
121        #[labels(node_id: str)]
122        pub tokio_remote_schedule_count: IntCounterVec,
123
124        #[name = tokio_local_queue_depth_total]
125        #[help = "Total size of all tokio workers local queues"]
126        #[labels(node_id: str)]
127        pub tokio_local_queue_depth_total: IntGaugeVec,
128
129        #[name = tokio_local_queue_depth_max]
130        #[help = "Length of the longest tokio worker local queue"]
131        #[labels(node_id: str)]
132        pub tokio_local_queue_depth_max: IntGaugeVec,
133
134        #[name = tokio_local_queue_depth_min]
135        #[help = "Length of the shortest tokio worker local queue"]
136        #[labels(node_id: str)]
137        pub tokio_local_queue_depth_min: IntGaugeVec,
138
139        #[name = tokio_steal_total]
140        #[help = "Total number of tasks stolen from other workers"]
141        #[labels(node_id: str)]
142        pub tokio_steal_total: IntCounterVec,
143
144        #[name = tokio_steal_operations_total]
145        #[help = "Total number of times a worker tried to steal a chunk of tasks"]
146        #[labels(node_id: str)]
147        pub tokio_steal_operations_total: IntCounterVec,
148
149        #[name = tokio_local_schedule_total]
150        #[help = "Total number of tasks scheduled from worker threads"]
151        #[labels(node_id: str)]
152        pub tokio_local_schedule_total: IntCounterVec,
153
154        #[name = tokio_overflow_total]
155        #[help = "Total number of times a tokio worker overflowed its local queue"]
156        #[labels(node_id: str)]
157        pub tokio_overflow_total: IntCounterVec,
158
159        #[name = tokio_busy_ratio_min]
160        #[help = "Busy ratio of the least busy tokio worker"]
161        #[labels(node_id: str)]
162        pub tokio_busy_ratio_min: GaugeVec,
163
164        #[name = tokio_busy_ratio_max]
165        #[help = "Busy ratio of the most busy tokio worker"]
166        #[labels(node_id: str)]
167        pub tokio_busy_ratio_max: GaugeVec,
168
169        #[name = tokio_busy_ratio_avg]
170        #[help = "Avg busy ratio of tokio workers"]
171        #[labels(node_id: str)]
172        pub tokio_busy_ratio_avg: GaugeVec,
173
174        #[name = tokio_mean_polls_per_park]
175        #[help = "Number of tasks polls divided by the times an idle worker was parked"]
176        #[labels(node_id: str)]
177        pub tokio_mean_polls_per_park: GaugeVec,
178
179        #[name = spacetime_websocket_sent_msg_size_bytes]
180        #[help = "The size of messages sent to connected sessions"]
181        #[labels(db: Identity, workload: WorkloadType)]
182        // Prometheus histograms have default buckets,
183        // which broadly speaking,
184        // are tailored to measure the response time of a network service.
185        //
186        // Therefore we define specific buckets for this metric,
187        // since it has a different unit and a different distribution.
188        //
189        // In particular incremental update payloads could be smaller than 1KB,
190        // whereas initial subscription payloads could exceed 10MB.
191        #[buckets(100, 500, 1e3, 10e3, 100e3, 500e3, 1e6, 5e6, 10e6, 25e6, 50e6, 75e6, 100e6, 500e6)]
192        pub websocket_sent_msg_size: HistogramVec,
193
194        #[name = spacetime_websocket_sent_num_rows]
195        #[help = "The number of rows sent to connected sessions"]
196        #[labels(db: Identity, workload: WorkloadType)]
197        // Prometheus histograms have default buckets,
198        // which broadly speaking,
199        // are tailored to measure the response time of a network service.
200        //
201        // Therefore we define specific buckets for this metric,
202        // since it has a different unit and a different distribution.
203        //
204        // In particular incremental updates could have fewer than 10 rows,
205        // whereas initial subscriptions could exceed 100K rows.
206        #[buckets(5, 10, 50, 100, 500, 1e3, 5e3, 10e3, 50e3, 100e3, 250e3, 500e3, 750e3, 1e6, 5e6)]
207        pub websocket_sent_num_rows: HistogramVec,
208
209        #[name = spacetime_websocket_serialize_secs]
210        #[help = "How long it took to serialize and maybe compress an outgoing websocket message"]
211        #[labels(db: Identity)]
212        #[buckets(0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0)]
213        pub websocket_serialize_secs: HistogramVec,
214
215        #[name = spacetime_worker_instance_operation_queue_length]
216        #[help = "Length of the wait queue for access to a module instance."]
217        #[labels(database_identity: Identity)]
218        pub instance_queue_length: IntGaugeVec,
219
220        #[name = spacetime_worker_instance_operation_queue_length_histogram]
221        #[help = "Length of the wait queue for access to a module instance."]
222        #[labels(database_identity: Identity)]
223        // Prometheus histograms have default buckets,
224        // which broadly speaking,
225        // are tailored to measure the response time of a network service.
226        // Hence we need to define specific buckets for queue length.
227        #[buckets(0, 1, 2, 5, 10, 25, 50, 75, 100, 200, 300, 400, 500, 1000)]
228        pub instance_queue_length_histogram: HistogramVec,
229
230        #[name = spacetime_reducer_wait_time_sec]
231        #[help = "The amount of time (in seconds) a reducer spends in the queue waiting to run"]
232        #[labels(db: Identity, reducer: str)]
233        // Prometheus histograms have default buckets,
234        // which broadly speaking,
235        // are tailored to measure the response time of a network service.
236        //
237        // However we expect a different value distribution for this metric.
238        // In particular the smallest bucket value is 5ms by default.
239        // But we expect many wait times to be on the order of microseconds.
240        #[buckets(100e-6, 500e-6, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10)]
241        pub reducer_wait_time: HistogramVec,
242
243        #[name = spacetime_worker_wasm_instance_errors_total]
244        #[help = "The number of fatal WASM instance errors, such as reducer panics."]
245        #[labels(caller_identity: Identity, module_hash: Hash, caller_connection_id: ConnectionId, reducer_symbol: str)]
246        pub wasm_instance_errors: IntCounterVec,
247
248        #[name = spacetime_worker_wasm_memory_bytes]
249        #[help = "The number of bytes of linear memory allocated by the database's WASM module instance"]
250        #[labels(database_identity: Identity)]
251        pub wasm_memory_bytes: IntGaugeVec,
252
253        #[name = spacetime_active_queries]
254        #[help = "The number of active subscription queries"]
255        #[labels(database_identity: Identity)]
256        pub subscription_queries: IntGaugeVec,
257
258        #[name = spacetime_request_round_trip_time]
259        #[help = "The total time it takes for request to complete"]
260        #[labels(txn_type: WorkloadType, database_identity: Identity, reducer_symbol: str)]
261        pub request_round_trip: HistogramVec,
262
263        #[name = spacetime_reducer_plus_query_duration_sec]
264        #[help = "The time spent executing a reducer (in seconds), plus the time spent evaluating its subscription queries"]
265        #[labels(db: Identity, reducer: str)]
266        pub reducer_plus_query_duration: HistogramVec,
267
268        #[name = spacetime_num_bytes_sent_to_clients_total]
269        #[help = "The cumulative number of bytes sent to clients"]
270        #[labels(txn_type: WorkloadType, db: Identity)]
271        pub bytes_sent_to_clients: IntCounterVec,
272
273        #[name = spacetime_subscription_send_queue_length]
274        #[help = "The number of `ComputedQueries` waiting in the queue to be aggregated and broadcast by the `send_worker`"]
275        #[labels(database_identity: Identity)]
276        pub subscription_send_queue_length: IntGaugeVec,
277
278        #[name = spacetime_total_incoming_queue_length]
279        #[help = "The number of client -> server WebSocket messages waiting any client's incoming queue"]
280        #[labels(db: Identity)]
281        pub total_incoming_queue_length: IntGaugeVec,
282
283        #[name = spacetime_total_outgoing_queue_length]
284        #[help = "The number of server -> client WebSocket messages waiting in any client's outgoing queue"]
285        #[labels(db: Identity)]
286        pub total_outgoing_queue_length: IntGaugeVec,
287    }
288);
289
290pub static WORKER_METRICS: Lazy<WorkerMetrics> = Lazy::new(WorkerMetrics::new);
291
292#[cfg(not(target_env = "msvc"))]
293use tikv_jemalloc_ctl::{epoch, stats};
294
295#[cfg(not(target_env = "msvc"))]
296static SPAWN_JEMALLOC_GUARD: Once = Once::new();
297pub fn spawn_jemalloc_stats(_node_id: String) {
298    #[cfg(not(target_env = "msvc"))]
299    SPAWN_JEMALLOC_GUARD.call_once(|| {
300        spawn(async move {
301            let allocated_bytes = WORKER_METRICS.jemalloc_allocated_bytes.with_label_values(&_node_id);
302            let resident_bytes = WORKER_METRICS.jemalloc_resident_bytes.with_label_values(&_node_id);
303            let active_bytes = WORKER_METRICS.jemalloc_active_bytes.with_label_values(&_node_id);
304
305            let e = epoch::mib().unwrap();
306            loop {
307                e.advance().unwrap();
308
309                let allocated = stats::allocated::read().unwrap();
310                let resident = stats::resident::read().unwrap();
311                let active = stats::active::read().unwrap();
312
313                allocated_bytes.set(allocated as i64);
314                resident_bytes.set(resident as i64);
315                active_bytes.set(active as i64);
316
317                sleep(Duration::from_secs(10)).await;
318            }
319        });
320    });
321}
322
323static SPAWN_PAGE_POOL_GUARD: Once = Once::new();
324pub fn spawn_page_pool_stats(node_id: String, page_pool: PagePool) {
325    SPAWN_PAGE_POOL_GUARD.call_once(|| {
326        spawn(async move {
327            let resident_bytes = WORKER_METRICS.page_pool_resident_bytes.with_label_values(&node_id);
328            let dropped_pages = WORKER_METRICS.page_pool_dropped_pages.with_label_values(&node_id);
329            let new_pages = WORKER_METRICS.page_pool_new_pages_allocated.with_label_values(&node_id);
330            let reused_pages = WORKER_METRICS.page_pool_pages_reused.with_label_values(&node_id);
331            let returned_pages = WORKER_METRICS.page_pool_pages_returned.with_label_values(&node_id);
332
333            loop {
334                resident_bytes.set(page_pool.heap_usage() as i64);
335                dropped_pages.set(page_pool.dropped_count() as i64);
336                new_pages.set(page_pool.new_allocated_count() as i64);
337                reused_pages.set(page_pool.reused_count() as i64);
338                returned_pages.set(page_pool.reused_count() as i64);
339
340                sleep(Duration::from_secs(10)).await;
341            }
342        });
343    });
344}
345
346// How frequently to update the tokio stats.
347#[cfg(all(target_has_atomic = "64", tokio_unstable))]
348const TOKIO_STATS_INTERVAL: Duration = Duration::from_secs(10);
349#[cfg(all(target_has_atomic = "64", tokio_unstable))]
350static SPAWN_TOKIO_STATS_GUARD: Once = Once::new();
351pub fn spawn_tokio_stats(node_id: String) {
352    // Some of these metrics could still be reported without these settings,
353    // but it is simpler to just skip all the tokio metrics if they aren't set.
354
355    #[cfg(not(all(target_has_atomic = "64", tokio_unstable)))]
356    log::warn!("Skipping tokio metrics for {node_id}, as they are not enabled in this build.");
357
358    #[cfg(all(target_has_atomic = "64", tokio_unstable))]
359    SPAWN_TOKIO_STATS_GUARD.call_once(|| {
360        spawn(async move {
361            // Set up our metric handles, so we don't keep calling `with_label_values`.
362            let num_worker_metric = WORKER_METRICS.tokio_num_workers.with_label_values(&node_id);
363            let num_blocking_threads_metric = WORKER_METRICS.tokio_num_blocking_threads.with_label_values(&node_id);
364            let num_alive_tasks_metric = WORKER_METRICS.tokio_num_alive_tasks.with_label_values(&node_id);
365            let global_queue_depth_metric = WORKER_METRICS.tokio_global_queue_depth.with_label_values(&node_id);
366            let num_idle_blocking_threads_metric = WORKER_METRICS
367                .tokio_num_idle_blocking_threads
368                .with_label_values(&node_id);
369            let blocking_queue_depth_metric = WORKER_METRICS.tokio_blocking_queue_depth.with_label_values(&node_id);
370            let spawned_tasks_count_metric = WORKER_METRICS.tokio_spawned_tasks_count.with_label_values(&node_id);
371            let remote_schedule_count_metric = WORKER_METRICS.tokio_remote_schedule_count.with_label_values(&node_id);
372
373            let local_queue_depth_total_metric =
374                WORKER_METRICS.tokio_local_queue_depth_total.with_label_values(&node_id);
375            let local_queue_depth_max_metric = WORKER_METRICS.tokio_local_queue_depth_max.with_label_values(&node_id);
376            let local_queue_depth_min_metric = WORKER_METRICS.tokio_local_queue_depth_min.with_label_values(&node_id);
377            let steal_total_metric = WORKER_METRICS.tokio_steal_total.with_label_values(&node_id);
378            let steal_operations_total_metric = WORKER_METRICS.tokio_steal_operations_total.with_label_values(&node_id);
379            let local_schedule_total_metric = WORKER_METRICS.tokio_local_schedule_total.with_label_values(&node_id);
380            let overflow_total_metric = WORKER_METRICS.tokio_overflow_total.with_label_values(&node_id);
381            let busy_ratio_min_metric = WORKER_METRICS.tokio_busy_ratio_min.with_label_values(&node_id);
382            let busy_ratio_max_metric = WORKER_METRICS.tokio_busy_ratio_max.with_label_values(&node_id);
383            let busy_ratio_avg_metric = WORKER_METRICS.tokio_busy_ratio_avg.with_label_values(&node_id);
384            let mean_polls_per_park_metric = WORKER_METRICS.tokio_mean_polls_per_park.with_label_values(&node_id);
385
386            let handle = tokio::runtime::Handle::current();
387            // The tokio_metrics library gives us some helpers for aggregating per-worker metrics.
388            let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
389            let mut intervals = runtime_monitor.intervals();
390            loop {
391                let metrics = tokio::runtime::Handle::current().metrics();
392                let interval_delta = intervals.next();
393
394                num_worker_metric.set(metrics.num_workers() as i64);
395                num_alive_tasks_metric.set(metrics.num_alive_tasks() as i64);
396                global_queue_depth_metric.set(metrics.global_queue_depth() as i64);
397                num_blocking_threads_metric.set(metrics.num_blocking_threads() as i64);
398                num_idle_blocking_threads_metric.set(metrics.num_idle_blocking_threads() as i64);
399                blocking_queue_depth_metric.set(metrics.blocking_queue_depth() as i64);
400
401                // The spawned tasks count and remote schedule count are cumulative,
402                // so we need to increment them by the difference from the last value.
403                {
404                    let current_count = metrics.spawned_tasks_count();
405                    let previous_value = spawned_tasks_count_metric.get();
406                    // The tokio metric should be monotonically increasing, but we are checking just in case.
407                    if let Some(diff) = current_count.checked_sub(previous_value) {
408                        spawned_tasks_count_metric.inc_by(diff);
409                    }
410                }
411                {
412                    let current_count = metrics.remote_schedule_count();
413                    let previous_value = remote_schedule_count_metric.get();
414                    // The tokio metric should be monotonically increasing, but we are checking just in case.
415                    if let Some(diff) = current_count.checked_sub(previous_value) {
416                        remote_schedule_count_metric.inc_by(diff);
417                    }
418                }
419
420                if let Some(interval_delta) = interval_delta {
421                    local_queue_depth_total_metric.set(interval_delta.total_local_queue_depth as i64);
422                    local_queue_depth_max_metric.set(interval_delta.max_local_queue_depth as i64);
423                    local_queue_depth_min_metric.set(interval_delta.min_local_queue_depth as i64);
424                    steal_total_metric.inc_by(interval_delta.total_steal_count);
425                    steal_operations_total_metric.inc_by(interval_delta.total_steal_operations);
426                    local_schedule_total_metric.inc_by(interval_delta.total_local_schedule_count);
427                    overflow_total_metric.inc_by(interval_delta.total_overflow_count);
428                    mean_polls_per_park_metric.set(interval_delta.mean_polls_per_park());
429
430                    // This is mostly to make sure we don't divide by zero, but we also want to skip the first interval if it is very short.
431                    if interval_delta.elapsed.as_millis() > 100 {
432                        busy_ratio_avg_metric.set(interval_delta.busy_ratio());
433                        busy_ratio_min_metric.set(
434                            interval_delta.min_busy_duration.as_nanos() as f64
435                                / interval_delta.elapsed.as_nanos() as f64,
436                        );
437                        busy_ratio_max_metric.set(
438                            interval_delta.max_busy_duration.as_nanos() as f64
439                                / interval_delta.elapsed.as_nanos() as f64,
440                        );
441                    }
442                }
443                sleep(TOKIO_STATS_INTERVAL).await;
444            }
445        });
446    });
447}