Skip to main content

nodedb_bridge/
wfq_metrics.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-virtual-queue depth and backpressure counters.
4//!
5//! These counters are incremented by the dispatch layer as databases cross the
6//! 85% (throttle) and 95% (suspend) thresholds of their fair-share WFQ slot
7//! allocation. The `/metrics` endpoint exposure is wired in a later pass; the
8//! counters themselves are authoritative from this point.
9//!
10//! # Concurrency
11//!
12//! Writers are Data Plane cores recording threshold transitions on the
13//! dispatch hot path; readers are the Tokio metrics exporter. To keep the
14//! hot path lock-free after the first observation of a given `database_id`,
15//! the map is wrapped in an `RwLock<HashMap<u64, Arc<DbCounters>>>`:
16//!
17//! - **Steady state**: a read lock is acquired, the `Arc<DbCounters>` is
18//!   cloned out, and the lock is dropped before the atomic `fetch_add`. No
19//!   writer ever blocks a reader once the entry exists.
20//! - **First-time entry**: the read lock is dropped, a write lock is taken,
21//!   the entry is double-checked, then inserted. This happens at most once
22//!   per `database_id` over the lifetime of the process.
23//!
24//! The counters themselves are `AtomicU64` so the increment after
25//! `Arc::clone` is genuinely lock-free.
26
27use std::collections::HashMap;
28use std::sync::atomic::{AtomicU64, Ordering};
29use std::sync::{Arc, RwLock};
30
31/// Per-database throttle/suspend event counters for one Data Plane core.
32pub struct VirtualQueueMetrics {
33    /// Keyed by database_id. Lazily created.
34    inner: RwLock<HashMap<u64, Arc<DbCounters>>>,
35}
36
37struct DbCounters {
38    /// Number of times this DB's virtual queue crossed the 85% throttle threshold.
39    throttle_events: AtomicU64,
40    /// Number of times this DB's virtual queue crossed the 95% suspend threshold.
41    suspend_events: AtomicU64,
42}
43
44impl DbCounters {
45    fn new() -> Self {
46        Self {
47            throttle_events: AtomicU64::new(0),
48            suspend_events: AtomicU64::new(0),
49        }
50    }
51}
52
53impl VirtualQueueMetrics {
54    pub fn new() -> Self {
55        Self {
56            inner: RwLock::new(HashMap::new()),
57        }
58    }
59
60    /// Record a throttle event (virtual queue crossed 85% of fair share).
61    pub fn record_throttle(&self, database_id: u64) {
62        self.counters(database_id)
63            .throttle_events
64            .fetch_add(1, Ordering::Relaxed);
65    }
66
67    /// Record a suspend event (virtual queue crossed 95% of fair share).
68    pub fn record_suspend(&self, database_id: u64) {
69        self.counters(database_id)
70            .suspend_events
71            .fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Total throttle events recorded for a database.
75    pub fn throttle_events(&self, database_id: u64) -> u64 {
76        let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
77        guard
78            .get(&database_id)
79            .map(|c| c.throttle_events.load(Ordering::Relaxed))
80            .unwrap_or(0)
81    }
82
83    /// Total suspend events recorded for a database.
84    pub fn suspend_events(&self, database_id: u64) -> u64 {
85        let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
86        guard
87            .get(&database_id)
88            .map(|c| c.suspend_events.load(Ordering::Relaxed))
89            .unwrap_or(0)
90    }
91
92    // ── Private helpers ───────────────────────────────────────────────────────
93
94    /// Return the `Arc<DbCounters>` for `database_id`, creating it on first
95    /// observation. Steady-state path takes only a read lock.
96    fn counters(&self, database_id: u64) -> Arc<DbCounters> {
97        // Fast path: entry exists, only a read lock is needed.
98        {
99            let guard = self.inner.read().unwrap_or_else(|p| p.into_inner());
100            if let Some(c) = guard.get(&database_id) {
101                return Arc::clone(c);
102            }
103        }
104        // Slow path: first-ever observation for this database. Acquire a
105        // write lock, double-check (another writer may have raced us), then
106        // insert. After this completes, all subsequent calls take the fast
107        // path forever.
108        let mut guard = self.inner.write().unwrap_or_else(|p| p.into_inner());
109        let entry = guard
110            .entry(database_id)
111            .or_insert_with(|| Arc::new(DbCounters::new()));
112        Arc::clone(entry)
113    }
114}
115
116impl Default for VirtualQueueMetrics {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use std::sync::Arc;
126    use std::thread;
127
128    #[test]
129    fn record_then_read() {
130        let m = VirtualQueueMetrics::new();
131        m.record_throttle(7);
132        m.record_throttle(7);
133        m.record_suspend(7);
134        assert_eq!(m.throttle_events(7), 2);
135        assert_eq!(m.suspend_events(7), 1);
136        assert_eq!(m.throttle_events(99), 0);
137    }
138
139    #[test]
140    fn concurrent_writers_share_atomic_counter() {
141        let m = Arc::new(VirtualQueueMetrics::new());
142        let mut handles = Vec::new();
143        for _ in 0..8 {
144            let m = Arc::clone(&m);
145            handles.push(thread::spawn(move || {
146                for _ in 0..1000 {
147                    m.record_throttle(42);
148                }
149            }));
150        }
151        for h in handles {
152            h.join().unwrap();
153        }
154        assert_eq!(m.throttle_events(42), 8 * 1000);
155    }
156
157    #[test]
158    fn concurrent_first_observation_does_not_lose_counts() {
159        // All 8 threads race to be the first observer of the same DB ID.
160        let m = Arc::new(VirtualQueueMetrics::new());
161        let mut handles = Vec::new();
162        for _ in 0..8 {
163            let m = Arc::clone(&m);
164            handles.push(thread::spawn(move || {
165                m.record_suspend(123);
166            }));
167        }
168        for h in handles {
169            h.join().unwrap();
170        }
171        assert_eq!(m.suspend_events(123), 8);
172    }
173}