nodedb_bridge/wfq_metrics.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-virtual-queue depth and backpressure counters.
4//!
5//! These counters are incremented by the dispatch layer as databases cross the
6//! 85% (throttle) and 95% (suspend) thresholds of their fair-share WFQ slot
7//! allocation. The `/metrics` endpoint exposure is wired in a later pass; the
8//! counters themselves are authoritative from this point.
9//!
10//! # Concurrency
11//!
12//! Writers are Data Plane cores recording threshold transitions on the
13//! dispatch hot path; readers are the Tokio metrics exporter. To keep the
14//! hot path lock-free after the first observation of a given `database_id`,
15//! the map is wrapped in an `RwLock<HashMap<u64, Arc<DbCounters>>>`:
16//!
17//! - **Steady state**: a read lock is acquired, the `Arc<DbCounters>` is
18//! cloned out, and the lock is dropped before the atomic `fetch_add`. No
19//! writer ever blocks a reader once the entry exists.
20//! - **First-time entry**: the read lock is dropped, a write lock is taken,
21//! the entry is double-checked, then inserted. This happens at most once
22//! per `database_id` over the lifetime of the process.
23//!
24//! The counters themselves are `AtomicU64` so the increment after
25//! `Arc::clone` is genuinely lock-free.
26
27use std::collections::HashMap;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::{Arc, RwLock};
30
31/// Per-database throttle/suspend event counters for one Data Plane core.
32pub struct VirtualQueueMetrics {
33 /// Keyed by database_id. Lazily created.
34 inner: RwLock<HashMap<u64, Arc<DbCounters>>>,
35}
36
37struct DbCounters {
38 /// Number of times this DB's virtual queue crossed the 85% throttle threshold.
39 throttle_events: AtomicU64,
40 /// Number of times this DB's virtual queue crossed the 95% suspend threshold.
41 suspend_events: AtomicU64,
42}
43
44impl DbCounters {
45 fn new() -> Self {
46 Self {
47 throttle_events: AtomicU64::new(0),
48 suspend_events: AtomicU64::new(0),
49 }
50 }
51}
52
53impl VirtualQueueMetrics {
54 pub fn new() -> Self {
55 Self {
56 inner: RwLock::new(HashMap::new()),
57 }
58 }
59
60 /// Record a throttle event (virtual queue crossed 85% of fair share).
61 pub fn record_throttle(&self, database_id: u64) {
62 self.counters(database_id)
63 .throttle_events
64 .fetch_add(1, Ordering::Relaxed);
65 }
66
67 /// Record a suspend event (virtual queue crossed 95% of fair share).
68 pub fn record_suspend(&self, database_id: u64) {
69 self.counters(database_id)
70 .suspend_events
71 .fetch_add(1, Ordering::Relaxed);
72 }
73
74 /// Total throttle events recorded for a database.
75 pub fn throttle_events(&self, database_id: u64) -> u64 {
76 let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
77 guard
78 .get(&database_id)
79 .map(|c| c.throttle_events.load(Ordering::Relaxed))
80 .unwrap_or(0)
81 }
82
83 /// Total suspend events recorded for a database.
84 pub fn suspend_events(&self, database_id: u64) -> u64 {
85 let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
86 guard
87 .get(&database_id)
88 .map(|c| c.suspend_events.load(Ordering::Relaxed))
89 .unwrap_or(0)
90 }
91
92 // ── Private helpers ───────────────────────────────────────────────────────
93
94 /// Return the `Arc<DbCounters>` for `database_id`, creating it on first
95 /// observation. Steady-state path takes only a read lock.
96 fn counters(&self, database_id: u64) -> Arc<DbCounters> {
97 // Fast path: entry exists, only a read lock is needed.
98 {
99 let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
100 if let Some(c) = guard.get(&database_id) {
101 return Arc::clone(c);
102 }
103 }
104 // Slow path: first-ever observation for this database. Acquire a
105 // write lock, double-check (another writer may have raced us), then
106 // insert. After this completes, all subsequent calls take the fast
107 // path forever.
108 let mut guard = self.inner.write().unwrap_or_else(|p| p.into_inner());
109 let entry = guard
110 .entry(database_id)
111 .or_insert_with(|| Arc::new(DbCounters::new()));
112 Arc::clone(entry)
113 }
114}
115
116impl Default for VirtualQueueMetrics {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use std::sync::Arc;
126 use std::thread;
127
128 #[test]
129 fn record_then_read() {
130 let m = VirtualQueueMetrics::new();
131 m.record_throttle(7);
132 m.record_throttle(7);
133 m.record_suspend(7);
134 assert_eq!(m.throttle_events(7), 2);
135 assert_eq!(m.suspend_events(7), 1);
136 assert_eq!(m.throttle_events(99), 0);
137 }
138
139 #[test]
140 fn concurrent_writers_share_atomic_counter() {
141 let m = Arc::new(VirtualQueueMetrics::new());
142 let mut handles = Vec::new();
143 for _ in 0..8 {
144 let m = Arc::clone(&m);
145 handles.push(thread::spawn(move || {
146 for _ in 0..1000 {
147 m.record_throttle(42);
148 }
149 }));
150 }
151 for h in handles {
152 h.join().unwrap();
153 }
154 assert_eq!(m.throttle_events(42), 8 * 1000);
155 }
156
157 #[test]
158 fn concurrent_first_observation_does_not_lose_counts() {
159 // All 8 threads race to be the first observer of the same DB ID.
160 let m = Arc::new(VirtualQueueMetrics::new());
161 let mut handles = Vec::new();
162 for _ in 0..8 {
163 let m = Arc::clone(&m);
164 handles.push(thread::spawn(move || {
165 m.record_suspend(123);
166 }));
167 }
168 for h in handles {
169 h.join().unwrap();
170 }
171 assert_eq!(m.suspend_events(123), 8);
172 }
173}