Skip to main content

nodedb_bridge/
wfq.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Weighted-fair queue for per-database SPSC bridge dispatch.
4//!
5//! Implements deficit round-robin (DRR) across per-database virtual sub-queues.
6//! Each database receives a quantum proportional to its `PriorityClass`:
7//!
8//! - `Critical` → weight 4
9//! - `Standard` → weight 2  (default)
10//! - `Bulk`     → weight 1
11//!
12//! The `cache_weight` field on `QuotaRecord` governs the *doc cache* shard
13//! size (Section E); dispatch quantum uses `PriorityClass` only, not
14//! `cache_weight`.
15//!
16//! # Backpressure
17//!
18//! Per-virtual-queue thresholds are computed against each database's fair
19//! share of total capacity:
20//!
21//! - ≥ 85% of fair share → throttled
22//! - ≥ 95% of fair share → suspended
23//!
24//! A database at its threshold does not block other databases that have
25//! remaining headroom.
26
27use std::collections::{HashMap, VecDeque};
28
29use nodedb_types::PriorityClass;
30
31/// Dispatch weight for a given priority class.
32///
33/// Critical gets 4×, Standard 2×, Bulk 1× the basic quantum.
34pub fn priority_weight(cls: PriorityClass) -> u32 {
35    match cls {
36        PriorityClass::Critical => 4,
37        PriorityClass::Standard => 2,
38        PriorityClass::Bulk => 1,
39    }
40}
41
42/// State for one per-database virtual sub-queue.
43struct VirtualQueue<T> {
44    items: VecDeque<T>,
45    /// Accumulated deficit: carries forward unused quantum from the previous
46    /// round so databases with lower arrival rates still get fair throughput.
47    deficit: u32,
48}
49
50impl<T> VirtualQueue<T> {
51    fn new() -> Self {
52        Self {
53            items: VecDeque::new(),
54            deficit: 0,
55        }
56    }
57}
58
59/// Weighted-fair queue, parameterized over item type `T`.
60///
61/// Maintains one virtual sub-queue per active `database_id`. The dispatcher
62/// calls `pop_next()` to retrieve the next item following deficit round-robin
63/// ordering. Total capacity across all virtual queues is bounded; `try_enqueue`
64/// returns `Err(item)` when the total is full.
65pub struct WeightedFairQueue<T> {
66    /// Per-database virtual queues. Created lazily on first enqueue.
67    queues: HashMap<u64, VirtualQueue<T>>,
68
69    /// Round-robin traversal order (stable insertion order for existing DBs).
70    db_order: VecDeque<u64>,
71
72    /// Total number of items across all virtual queues.
73    total: usize,
74
75    /// Hard cap on total items across all virtual queues.
76    capacity: usize,
77
78    /// Per-database priority class, consulted during `pop_next`.
79    priorities: HashMap<u64, PriorityClass>,
80
81    /// Number of `pop_next` calls since the last queue was reaped; used to
82    /// garbage-collect drained virtual queues lazily.
83    pops_since_reap: usize,
84
85    /// Reap empty virtual queues after this many pop attempts without activity.
86    reap_after_pops: usize,
87}
88
89impl<T> WeightedFairQueue<T> {
90    /// Create a new weighted-fair queue with the given total capacity and reap
91    /// threshold. `reap_after_pops` controls how many empty queues persist
92    /// after draining before being garbage-collected.
93    pub fn new(capacity: usize, reap_after_pops: usize) -> Self {
94        Self {
95            queues: HashMap::new(),
96            db_order: VecDeque::new(),
97            total: 0,
98            capacity,
99            priorities: HashMap::new(),
100            pops_since_reap: 0,
101            reap_after_pops,
102        }
103    }
104
105    /// Attempt to enqueue `item` for `database_id`. Returns `Err(item)` if the
106    /// total queue has reached capacity.
107    pub fn try_enqueue(&mut self, database_id: u64, item: T) -> Result<(), T> {
108        if self.total >= self.capacity {
109            return Err(item);
110        }
111        if let std::collections::hash_map::Entry::Vacant(e) = self.queues.entry(database_id) {
112            e.insert(VirtualQueue::new());
113            self.db_order.push_back(database_id);
114        }
115        // Safe: we just ensured the key exists.
116        let vq = self.queues.get_mut(&database_id).expect("just inserted");
117        vq.items.push_back(item);
118        self.total += 1;
119        Ok(())
120    }
121
122    /// Set (or update) the priority class for a database. Applied on the next
123    /// `pop_next` call after this update.
124    pub fn set_priority(&mut self, database_id: u64, cls: PriorityClass) {
125        self.priorities.insert(database_id, cls);
126    }
127
128    /// Pop the next item using deficit round-robin across all virtual queues.
129    ///
130    /// Returns `None` if all virtual queues are empty.
131    ///
132    /// Each database is served for up to `priority_weight(class)` consecutive
133    /// items before the scheduler rotates to the next database. Deficit credits
134    /// are added once per turn (when a DB's deficit reaches zero and it re-enters
135    /// the front of the rotation) and carried across calls so databases with
136    /// lower arrival rates still accumulate credits fairly.
137    pub fn pop_next(&mut self) -> Option<T> {
138        if self.total == 0 {
139            return None;
140        }
141
142        // Walk the round-robin ring. We may need to skip empty queues, so we
143        // bound the scan to at most `n` DB rotations to avoid an infinite loop
144        // when all but one queue is empty.
145        let n = self.db_order.len();
146        for _ in 0..n {
147            let db_id = match self.db_order.front().copied() {
148                Some(id) => id,
149                None => break,
150            };
151
152            let vq = match self.queues.get_mut(&db_id) {
153                Some(vq) => vq,
154                None => {
155                    self.db_order.pop_front();
156                    continue;
157                }
158            };
159
160            // If this DB has no deficit remaining from its previous turn, grant
161            // a new quantum now (beginning of a new turn for this DB).
162            if vq.deficit == 0 {
163                let cls = self.priorities.get(&db_id).copied().unwrap_or_default();
164                vq.deficit = priority_weight(cls);
165            }
166
167            if let Some(item) = vq.items.pop_front() {
168                vq.deficit -= 1;
169                self.total -= 1;
170
171                // If this DB's deficit is now exhausted, rotate it to the back
172                // so the next DB gets its turn. Otherwise leave it at the front
173                // so we keep draining it next call.
174                if vq.deficit == 0 {
175                    self.db_order.pop_front();
176                    self.db_order.push_back(db_id);
177                }
178
179                self.pops_since_reap += 1;
180                if self.pops_since_reap >= self.reap_after_pops {
181                    self.reap_empty_queues();
182                    self.pops_since_reap = 0;
183                }
184                return Some(item);
185            } else {
186                // Queue drained; reset deficit so credits don't accumulate
187                // unboundedly for an inactive DB, then rotate to next.
188                vq.deficit = 0;
189                self.db_order.pop_front();
190                self.db_order.push_back(db_id);
191            }
192        }
193        None
194    }
195
196    /// Number of items queued for a specific database.
197    pub fn depth_for(&self, database_id: u64) -> usize {
198        self.queues
199            .get(&database_id)
200            .map(|vq| vq.items.len())
201            .unwrap_or(0)
202    }
203
204    /// Total items across all virtual queues.
205    pub fn total_depth(&self) -> usize {
206        self.total
207    }
208
209    /// Returns `true` if the given database's virtual queue has reached ≥ 85%
210    /// of its fair share of total capacity.
211    ///
212    /// Fair share = `capacity / active_databases` (floor division, min 1).
213    /// Databases with higher priority class receive proportionally more fair
214    /// share in the weight sense but the *slot* fair share is still equal
215    /// (per-DB slot pressure uses equal division to avoid one class starving
216    /// another's absolute headroom).
217    pub fn is_throttled_for(&self, database_id: u64) -> bool {
218        let depth = self.depth_for(database_id);
219        let fair_share = self.fair_share_slots();
220        depth * 100 >= fair_share * 85
221    }
222
223    /// Returns `true` if the given database's virtual queue has reached ≥ 95%
224    /// of its fair share of total capacity.
225    pub fn is_suspended_for(&self, database_id: u64) -> bool {
226        let depth = self.depth_for(database_id);
227        let fair_share = self.fair_share_slots();
228        depth * 100 >= fair_share * 95
229    }
230
231    /// Number of active virtual queues (including empty, not-yet-reaped ones).
232    pub fn active_database_count(&self) -> usize {
233        self.queues.len()
234    }
235
236    // ── Private helpers ───────────────────────────────────────────────────────
237
238    /// Slots allocated per database for fair-share computations.
239    ///
240    /// Active count is the number of databases that have been explicitly
241    /// registered (via `set_priority`) or have an active virtual queue.
242    /// This prevents the fair-share from inflating when only one DB has
243    /// enqueued items but multiple DBs are known to the scheduler.
244    fn fair_share_slots(&self) -> usize {
245        let active = self.priorities.len().max(self.queues.len()).max(1);
246        (self.capacity / active).max(1)
247    }
248
249    /// Remove virtual queues that have been empty for a full reap cycle.
250    fn reap_empty_queues(&mut self) {
251        let empty_ids: Vec<u64> = self
252            .queues
253            .iter()
254            .filter(|(_, vq)| vq.items.is_empty())
255            .map(|(&id, _)| id)
256            .collect();
257        for id in empty_ids {
258            self.queues.remove(&id);
259            self.db_order.retain(|&x| x != id);
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn single_db_behaves_like_fifo() {
270        let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(64, 100);
271        for i in 0..8u32 {
272            wfq.try_enqueue(1, i).unwrap();
273        }
274        for i in 0..8u32 {
275            assert_eq!(wfq.pop_next(), Some(i));
276        }
277        assert_eq!(wfq.pop_next(), None);
278    }
279
280    #[test]
281    fn two_dbs_equal_priority_round_robin() {
282        let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(64, 100);
283        wfq.set_priority(1, PriorityClass::Standard);
284        wfq.set_priority(2, PriorityClass::Standard);
285
286        // Enqueue 4 items each.
287        for i in 0..4u32 {
288            wfq.try_enqueue(1, (1, i)).unwrap();
289            wfq.try_enqueue(2, (2, i)).unwrap();
290        }
291
292        let mut db1_count = 0u32;
293        let mut db2_count = 0u32;
294        while let Some((db, _)) = wfq.pop_next() {
295            match db {
296                1 => db1_count += 1,
297                2 => db2_count += 1,
298                _ => panic!("unexpected db"),
299            }
300        }
301        // Equal priority → equal share.
302        assert_eq!(db1_count, 4);
303        assert_eq!(db2_count, 4);
304    }
305
306    #[test]
307    fn critical_drains_roughly_4x_faster_than_bulk() {
308        let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(256, 1000);
309        wfq.set_priority(1, PriorityClass::Critical);
310        wfq.set_priority(2, PriorityClass::Bulk);
311
312        // Enqueue 80 items each.
313        for i in 0..80u32 {
314            wfq.try_enqueue(1, (1, i)).unwrap();
315            wfq.try_enqueue(2, (2, i)).unwrap();
316        }
317
318        // Pop the first 20 items and count by DB.
319        let mut critical_count = 0u32;
320        let mut bulk_count = 0u32;
321        for _ in 0..20 {
322            match wfq.pop_next() {
323                Some((1, _)) => critical_count += 1,
324                Some((2, _)) => bulk_count += 1,
325                _ => {}
326            }
327        }
328        // Critical weight=4, Bulk weight=1 → ratio ≥ 3:1 in first 20 pops.
329        assert!(
330            critical_count >= 3 * bulk_count,
331            "critical={critical_count} bulk={bulk_count}: expected ≥ 3:1 ratio"
332        );
333    }
334
335    #[test]
336    fn saturated_db_a_does_not_block_db_b() {
337        let capacity = 8;
338        let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
339        wfq.set_priority(1, PriorityClass::Standard);
340        wfq.set_priority(2, PriorityClass::Standard);
341
342        // Fill up fair share for DB 1 (4 out of 8 slots).
343        for i in 0..4u32 {
344            wfq.try_enqueue(1, i).unwrap();
345        }
346
347        // DB 2 should still be enqueueable.
348        for i in 0..4u32 {
349            assert!(
350                wfq.try_enqueue(2, i).is_ok(),
351                "DB 2 enqueue {i} should succeed while DB 1 occupies its fair share"
352            );
353        }
354        assert_eq!(wfq.total_depth(), 8);
355    }
356
357    #[test]
358    fn bound_total_never_exceeded() {
359        let capacity = 4;
360        let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
361
362        // Fill to capacity across two databases.
363        for i in 0..2u32 {
364            wfq.try_enqueue(1, i).unwrap();
365            wfq.try_enqueue(2, i).unwrap();
366        }
367        assert_eq!(wfq.total_depth(), capacity);
368
369        // Next enqueue must fail regardless of which DB.
370        assert!(wfq.try_enqueue(1, 99).is_err());
371        assert!(wfq.try_enqueue(2, 99).is_err());
372        assert!(wfq.try_enqueue(3, 99).is_err());
373    }
374
375    #[test]
376    fn backpressure_thresholds_per_virtual_queue() {
377        let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(8, 100);
378        wfq.set_priority(1, PriorityClass::Standard);
379        wfq.set_priority(2, PriorityClass::Standard);
380
381        // Fair share = 8 / 2 = 4 per DB.
382        // Push 3 items into DB 1 → 3/4 = 75% → not throttled.
383        for _ in 0..3 {
384            wfq.try_enqueue(1, 0).unwrap();
385        }
386        assert!(!wfq.is_throttled_for(1));
387        assert!(!wfq.is_suspended_for(1));
388
389        // Push 1 more → 4/4 = 100% → throttled AND suspended.
390        wfq.try_enqueue(1, 0).unwrap();
391        assert!(wfq.is_throttled_for(1));
392        assert!(wfq.is_suspended_for(1));
393
394        // DB 2 is untouched → not throttled.
395        assert!(!wfq.is_throttled_for(2));
396    }
397}