nodedb_bridge/wfq.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Weighted-fair queue for per-database SPSC bridge dispatch.
4//!
5//! Implements deficit round-robin (DRR) across per-database virtual sub-queues.
6//! Each database receives a quantum proportional to its `PriorityClass`:
7//!
8//! - `Critical` → weight 4
9//! - `Standard` → weight 2 (default)
10//! - `Bulk` → weight 1
11//!
12//! The `cache_weight` field on `QuotaRecord` governs the *doc cache* shard
13//! size (Section E); dispatch quantum uses `PriorityClass` only, not
14//! `cache_weight`.
15//!
16//! # Backpressure
17//!
18//! Per-virtual-queue thresholds are computed against each database's fair
19//! share of total capacity:
20//!
21//! - ≥ 85% of fair share → throttled
22//! - ≥ 95% of fair share → suspended
23//!
24//! A database at its threshold does not block other databases that have
25//! remaining headroom.
26
27use std::collections::{HashMap, VecDeque};
28
29use nodedb_types::PriorityClass;
30
31/// Dispatch weight for a given priority class.
32///
33/// Critical gets 4×, Standard 2×, Bulk 1× the basic quantum.
34pub fn priority_weight(cls: PriorityClass) -> u32 {
35 match cls {
36 PriorityClass::Critical => 4,
37 PriorityClass::Standard => 2,
38 PriorityClass::Bulk => 1,
39 }
40}
41
42/// State for one per-database virtual sub-queue.
43struct VirtualQueue<T> {
44 items: VecDeque<T>,
45 /// Accumulated deficit: carries forward unused quantum from the previous
46 /// round so databases with lower arrival rates still get fair throughput.
47 deficit: u32,
48}
49
50impl<T> VirtualQueue<T> {
51 fn new() -> Self {
52 Self {
53 items: VecDeque::new(),
54 deficit: 0,
55 }
56 }
57}
58
59/// Weighted-fair queue, parameterized over item type `T`.
60///
61/// Maintains one virtual sub-queue per active `database_id`. The dispatcher
62/// calls `pop_next()` to retrieve the next item following deficit round-robin
63/// ordering. Total capacity across all virtual queues is bounded; `try_enqueue`
64/// returns `Err(item)` when the total is full.
65pub struct WeightedFairQueue<T> {
66 /// Per-database virtual queues. Created lazily on first enqueue.
67 queues: HashMap<u64, VirtualQueue<T>>,
68
69 /// Round-robin traversal order (stable insertion order for existing DBs).
70 db_order: VecDeque<u64>,
71
72 /// Total number of items across all virtual queues.
73 total: usize,
74
75 /// Hard cap on total items across all virtual queues.
76 capacity: usize,
77
78 /// Per-database priority class, consulted during `pop_next`.
79 priorities: HashMap<u64, PriorityClass>,
80
81 /// Number of `pop_next` calls since the last queue was reaped; used to
82 /// garbage-collect drained virtual queues lazily.
83 pops_since_reap: usize,
84
85 /// Reap empty virtual queues after this many pop attempts without activity.
86 reap_after_pops: usize,
87}
88
89impl<T> WeightedFairQueue<T> {
90 /// Create a new weighted-fair queue with the given total capacity and reap
91 /// threshold. `reap_after_pops` controls how many empty queues persist
92 /// after draining before being garbage-collected.
93 pub fn new(capacity: usize, reap_after_pops: usize) -> Self {
94 Self {
95 queues: HashMap::new(),
96 db_order: VecDeque::new(),
97 total: 0,
98 capacity,
99 priorities: HashMap::new(),
100 pops_since_reap: 0,
101 reap_after_pops,
102 }
103 }
104
105 /// Attempt to enqueue `item` for `database_id`. Returns `Err(item)` if the
106 /// total queue has reached capacity.
107 pub fn try_enqueue(&mut self, database_id: u64, item: T) -> Result<(), T> {
108 if self.total >= self.capacity {
109 return Err(item);
110 }
111 if let std::collections::hash_map::Entry::Vacant(e) = self.queues.entry(database_id) {
112 e.insert(VirtualQueue::new());
113 self.db_order.push_back(database_id);
114 }
115 // Safe: we just ensured the key exists.
116 let vq = self.queues.get_mut(&database_id).expect("just inserted");
117 vq.items.push_back(item);
118 self.total += 1;
119 Ok(())
120 }
121
122 /// Set (or update) the priority class for a database. Applied on the next
123 /// `pop_next` call after this update.
124 pub fn set_priority(&mut self, database_id: u64, cls: PriorityClass) {
125 self.priorities.insert(database_id, cls);
126 }
127
128 /// Pop the next item using deficit round-robin across all virtual queues.
129 ///
130 /// Returns `None` if all virtual queues are empty.
131 ///
132 /// Each database is served for up to `priority_weight(class)` consecutive
133 /// items before the scheduler rotates to the next database. Deficit credits
134 /// are added once per turn (when a DB's deficit reaches zero and it re-enters
135 /// the front of the rotation) and carried across calls so databases with
136 /// lower arrival rates still accumulate credits fairly.
137 pub fn pop_next(&mut self) -> Option<T> {
138 if self.total == 0 {
139 return None;
140 }
141
142 // Walk the round-robin ring. We may need to skip empty queues, so we
143 // bound the scan to at most `n` DB rotations to avoid an infinite loop
144 // when all but one queue is empty.
145 let n = self.db_order.len();
146 for _ in 0..n {
147 let db_id = match self.db_order.front().copied() {
148 Some(id) => id,
149 None => break,
150 };
151
152 let vq = match self.queues.get_mut(&db_id) {
153 Some(vq) => vq,
154 None => {
155 self.db_order.pop_front();
156 continue;
157 }
158 };
159
160 // If this DB has no deficit remaining from its previous turn, grant
161 // a new quantum now (beginning of a new turn for this DB).
162 if vq.deficit == 0 {
163 let cls = self.priorities.get(&db_id).copied().unwrap_or_default();
164 vq.deficit = priority_weight(cls);
165 }
166
167 if let Some(item) = vq.items.pop_front() {
168 vq.deficit -= 1;
169 self.total -= 1;
170
171 // If this DB's deficit is now exhausted, rotate it to the back
172 // so the next DB gets its turn. Otherwise leave it at the front
173 // so we keep draining it next call.
174 if vq.deficit == 0 {
175 self.db_order.pop_front();
176 self.db_order.push_back(db_id);
177 }
178
179 self.pops_since_reap += 1;
180 if self.pops_since_reap >= self.reap_after_pops {
181 self.reap_empty_queues();
182 self.pops_since_reap = 0;
183 }
184 return Some(item);
185 } else {
186 // Queue drained; reset deficit so credits don't accumulate
187 // unboundedly for an inactive DB, then rotate to next.
188 vq.deficit = 0;
189 self.db_order.pop_front();
190 self.db_order.push_back(db_id);
191 }
192 }
193 None
194 }
195
196 /// Number of items queued for a specific database.
197 pub fn depth_for(&self, database_id: u64) -> usize {
198 self.queues
199 .get(&database_id)
200 .map(|vq| vq.items.len())
201 .unwrap_or(0)
202 }
203
204 /// Total items across all virtual queues.
205 pub fn total_depth(&self) -> usize {
206 self.total
207 }
208
209 /// Returns `true` if the given database's virtual queue has reached ≥ 85%
210 /// of its fair share of total capacity.
211 ///
212 /// Fair share = `capacity / active_databases` (floor division, min 1).
213 /// Databases with higher priority class receive proportionally more fair
214 /// share in the weight sense but the *slot* fair share is still equal
215 /// (per-DB slot pressure uses equal division to avoid one class starving
216 /// another's absolute headroom).
217 pub fn is_throttled_for(&self, database_id: u64) -> bool {
218 let depth = self.depth_for(database_id);
219 let fair_share = self.fair_share_slots();
220 depth * 100 >= fair_share * 85
221 }
222
223 /// Returns `true` if the given database's virtual queue has reached ≥ 95%
224 /// of its fair share of total capacity.
225 pub fn is_suspended_for(&self, database_id: u64) -> bool {
226 let depth = self.depth_for(database_id);
227 let fair_share = self.fair_share_slots();
228 depth * 100 >= fair_share * 95
229 }
230
231 /// Number of active virtual queues (including empty, not-yet-reaped ones).
232 pub fn active_database_count(&self) -> usize {
233 self.queues.len()
234 }
235
236 // ── Private helpers ───────────────────────────────────────────────────────
237
238 /// Slots allocated per database for fair-share computations.
239 ///
240 /// Active count is the number of databases that have been explicitly
241 /// registered (via `set_priority`) or have an active virtual queue.
242 /// This prevents the fair-share from inflating when only one DB has
243 /// enqueued items but multiple DBs are known to the scheduler.
244 fn fair_share_slots(&self) -> usize {
245 let active = self.priorities.len().max(self.queues.len()).max(1);
246 (self.capacity / active).max(1)
247 }
248
249 /// Remove virtual queues that have been empty for a full reap cycle.
250 fn reap_empty_queues(&mut self) {
251 let empty_ids: Vec<u64> = self
252 .queues
253 .iter()
254 .filter(|(_, vq)| vq.items.is_empty())
255 .map(|(&id, _)| id)
256 .collect();
257 for id in empty_ids {
258 self.queues.remove(&id);
259 self.db_order.retain(|&x| x != id);
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn single_db_behaves_like_fifo() {
270 let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(64, 100);
271 for i in 0..8u32 {
272 wfq.try_enqueue(1, i).unwrap();
273 }
274 for i in 0..8u32 {
275 assert_eq!(wfq.pop_next(), Some(i));
276 }
277 assert_eq!(wfq.pop_next(), None);
278 }
279
280 #[test]
281 fn two_dbs_equal_priority_round_robin() {
282 let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(64, 100);
283 wfq.set_priority(1, PriorityClass::Standard);
284 wfq.set_priority(2, PriorityClass::Standard);
285
286 // Enqueue 4 items each.
287 for i in 0..4u32 {
288 wfq.try_enqueue(1, (1, i)).unwrap();
289 wfq.try_enqueue(2, (2, i)).unwrap();
290 }
291
292 let mut db1_count = 0u32;
293 let mut db2_count = 0u32;
294 while let Some((db, _)) = wfq.pop_next() {
295 match db {
296 1 => db1_count += 1,
297 2 => db2_count += 1,
298 _ => panic!("unexpected db"),
299 }
300 }
301 // Equal priority → equal share.
302 assert_eq!(db1_count, 4);
303 assert_eq!(db2_count, 4);
304 }
305
306 #[test]
307 fn critical_drains_roughly_4x_faster_than_bulk() {
308 let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(256, 1000);
309 wfq.set_priority(1, PriorityClass::Critical);
310 wfq.set_priority(2, PriorityClass::Bulk);
311
312 // Enqueue 80 items each.
313 for i in 0..80u32 {
314 wfq.try_enqueue(1, (1, i)).unwrap();
315 wfq.try_enqueue(2, (2, i)).unwrap();
316 }
317
318 // Pop the first 20 items and count by DB.
319 let mut critical_count = 0u32;
320 let mut bulk_count = 0u32;
321 for _ in 0..20 {
322 match wfq.pop_next() {
323 Some((1, _)) => critical_count += 1,
324 Some((2, _)) => bulk_count += 1,
325 _ => {}
326 }
327 }
328 // Critical weight=4, Bulk weight=1 → ratio ≥ 3:1 in first 20 pops.
329 assert!(
330 critical_count >= 3 * bulk_count,
331 "critical={critical_count} bulk={bulk_count}: expected ≥ 3:1 ratio"
332 );
333 }
334
335 #[test]
336 fn saturated_db_a_does_not_block_db_b() {
337 let capacity = 8;
338 let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
339 wfq.set_priority(1, PriorityClass::Standard);
340 wfq.set_priority(2, PriorityClass::Standard);
341
342 // Fill up fair share for DB 1 (4 out of 8 slots).
343 for i in 0..4u32 {
344 wfq.try_enqueue(1, i).unwrap();
345 }
346
347 // DB 2 should still be enqueueable.
348 for i in 0..4u32 {
349 assert!(
350 wfq.try_enqueue(2, i).is_ok(),
351 "DB 2 enqueue {i} should succeed while DB 1 occupies its fair share"
352 );
353 }
354 assert_eq!(wfq.total_depth(), 8);
355 }
356
357 #[test]
358 fn bound_total_never_exceeded() {
359 let capacity = 4;
360 let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
361
362 // Fill to capacity across two databases.
363 for i in 0..2u32 {
364 wfq.try_enqueue(1, i).unwrap();
365 wfq.try_enqueue(2, i).unwrap();
366 }
367 assert_eq!(wfq.total_depth(), capacity);
368
369 // Next enqueue must fail regardless of which DB.
370 assert!(wfq.try_enqueue(1, 99).is_err());
371 assert!(wfq.try_enqueue(2, 99).is_err());
372 assert!(wfq.try_enqueue(3, 99).is_err());
373 }
374
375 #[test]
376 fn backpressure_thresholds_per_virtual_queue() {
377 let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(8, 100);
378 wfq.set_priority(1, PriorityClass::Standard);
379 wfq.set_priority(2, PriorityClass::Standard);
380
381 // Fair share = 8 / 2 = 4 per DB.
382 // Push 3 items into DB 1 → 3/4 = 75% → not throttled.
383 for _ in 0..3 {
384 wfq.try_enqueue(1, 0).unwrap();
385 }
386 assert!(!wfq.is_throttled_for(1));
387 assert!(!wfq.is_suspended_for(1));
388
389 // Push 1 more → 4/4 = 100% → throttled AND suspended.
390 wfq.try_enqueue(1, 0).unwrap();
391 assert!(wfq.is_throttled_for(1));
392 assert!(wfq.is_suspended_for(1));
393
394 // DB 2 is untouched → not throttled.
395 assert!(!wfq.is_throttled_for(2));
396 }
397}