use std::collections::{HashMap, VecDeque};
use nodedb_types::PriorityClass;
pub fn priority_weight(cls: PriorityClass) -> u32 {
match cls {
PriorityClass::Critical => 4,
PriorityClass::Standard => 2,
PriorityClass::Bulk => 1,
}
}
struct VirtualQueue<T> {
items: VecDeque<T>,
deficit: u32,
}
impl<T> VirtualQueue<T> {
fn new() -> Self {
Self {
items: VecDeque::new(),
deficit: 0,
}
}
}
pub struct WeightedFairQueue<T> {
queues: HashMap<u64, VirtualQueue<T>>,
db_order: VecDeque<u64>,
total: usize,
capacity: usize,
priorities: HashMap<u64, PriorityClass>,
pops_since_reap: usize,
reap_after_pops: usize,
}
impl<T> WeightedFairQueue<T> {
pub fn new(capacity: usize, reap_after_pops: usize) -> Self {
Self {
queues: HashMap::new(),
db_order: VecDeque::new(),
total: 0,
capacity,
priorities: HashMap::new(),
pops_since_reap: 0,
reap_after_pops,
}
}
pub fn try_enqueue(&mut self, database_id: u64, item: T) -> Result<(), T> {
if self.total >= self.capacity {
return Err(item);
}
if let std::collections::hash_map::Entry::Vacant(e) = self.queues.entry(database_id) {
e.insert(VirtualQueue::new());
self.db_order.push_back(database_id);
}
let vq = self.queues.get_mut(&database_id).expect("just inserted");
vq.items.push_back(item);
self.total += 1;
Ok(())
}
pub fn set_priority(&mut self, database_id: u64, cls: PriorityClass) {
self.priorities.insert(database_id, cls);
}
pub fn pop_next(&mut self) -> Option<T> {
if self.total == 0 {
return None;
}
let n = self.db_order.len();
for _ in 0..n {
let db_id = match self.db_order.front().copied() {
Some(id) => id,
None => break,
};
let vq = match self.queues.get_mut(&db_id) {
Some(vq) => vq,
None => {
self.db_order.pop_front();
continue;
}
};
if vq.deficit == 0 {
let cls = self.priorities.get(&db_id).copied().unwrap_or_default();
vq.deficit = priority_weight(cls);
}
if let Some(item) = vq.items.pop_front() {
vq.deficit -= 1;
self.total -= 1;
if vq.deficit == 0 {
self.db_order.pop_front();
self.db_order.push_back(db_id);
}
self.pops_since_reap += 1;
if self.pops_since_reap >= self.reap_after_pops {
self.reap_empty_queues();
self.pops_since_reap = 0;
}
return Some(item);
} else {
vq.deficit = 0;
self.db_order.pop_front();
self.db_order.push_back(db_id);
}
}
None
}
pub fn depth_for(&self, database_id: u64) -> usize {
self.queues
.get(&database_id)
.map(|vq| vq.items.len())
.unwrap_or(0)
}
pub fn total_depth(&self) -> usize {
self.total
}
pub fn is_throttled_for(&self, database_id: u64) -> bool {
let depth = self.depth_for(database_id);
let fair_share = self.fair_share_slots();
depth * 100 >= fair_share * 85
}
pub fn is_suspended_for(&self, database_id: u64) -> bool {
let depth = self.depth_for(database_id);
let fair_share = self.fair_share_slots();
depth * 100 >= fair_share * 95
}
pub fn active_database_count(&self) -> usize {
self.queues.len()
}
fn fair_share_slots(&self) -> usize {
let active = self.priorities.len().max(self.queues.len()).max(1);
(self.capacity / active).max(1)
}
fn reap_empty_queues(&mut self) {
let empty_ids: Vec<u64> = self
.queues
.iter()
.filter(|(_, vq)| vq.items.is_empty())
.map(|(&id, _)| id)
.collect();
for id in empty_ids {
self.queues.remove(&id);
self.db_order.retain(|&x| x != id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_db_behaves_like_fifo() {
let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(64, 100);
for i in 0..8u32 {
wfq.try_enqueue(1, i).unwrap();
}
for i in 0..8u32 {
assert_eq!(wfq.pop_next(), Some(i));
}
assert_eq!(wfq.pop_next(), None);
}
#[test]
fn two_dbs_equal_priority_round_robin() {
let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(64, 100);
wfq.set_priority(1, PriorityClass::Standard);
wfq.set_priority(2, PriorityClass::Standard);
for i in 0..4u32 {
wfq.try_enqueue(1, (1, i)).unwrap();
wfq.try_enqueue(2, (2, i)).unwrap();
}
let mut db1_count = 0u32;
let mut db2_count = 0u32;
while let Some((db, _)) = wfq.pop_next() {
match db {
1 => db1_count += 1,
2 => db2_count += 1,
_ => panic!("unexpected db"),
}
}
assert_eq!(db1_count, 4);
assert_eq!(db2_count, 4);
}
#[test]
fn critical_drains_roughly_4x_faster_than_bulk() {
let mut wfq: WeightedFairQueue<(u64, u32)> = WeightedFairQueue::new(256, 1000);
wfq.set_priority(1, PriorityClass::Critical);
wfq.set_priority(2, PriorityClass::Bulk);
for i in 0..80u32 {
wfq.try_enqueue(1, (1, i)).unwrap();
wfq.try_enqueue(2, (2, i)).unwrap();
}
let mut critical_count = 0u32;
let mut bulk_count = 0u32;
for _ in 0..20 {
match wfq.pop_next() {
Some((1, _)) => critical_count += 1,
Some((2, _)) => bulk_count += 1,
_ => {}
}
}
assert!(
critical_count >= 3 * bulk_count,
"critical={critical_count} bulk={bulk_count}: expected ≥ 3:1 ratio"
);
}
#[test]
fn saturated_db_a_does_not_block_db_b() {
let capacity = 8;
let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
wfq.set_priority(1, PriorityClass::Standard);
wfq.set_priority(2, PriorityClass::Standard);
for i in 0..4u32 {
wfq.try_enqueue(1, i).unwrap();
}
for i in 0..4u32 {
assert!(
wfq.try_enqueue(2, i).is_ok(),
"DB 2 enqueue {i} should succeed while DB 1 occupies its fair share"
);
}
assert_eq!(wfq.total_depth(), 8);
}
#[test]
fn bound_total_never_exceeded() {
let capacity = 4;
let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(capacity, 100);
for i in 0..2u32 {
wfq.try_enqueue(1, i).unwrap();
wfq.try_enqueue(2, i).unwrap();
}
assert_eq!(wfq.total_depth(), capacity);
assert!(wfq.try_enqueue(1, 99).is_err());
assert!(wfq.try_enqueue(2, 99).is_err());
assert!(wfq.try_enqueue(3, 99).is_err());
}
#[test]
fn backpressure_thresholds_per_virtual_queue() {
let mut wfq: WeightedFairQueue<u32> = WeightedFairQueue::new(8, 100);
wfq.set_priority(1, PriorityClass::Standard);
wfq.set_priority(2, PriorityClass::Standard);
for _ in 0..3 {
wfq.try_enqueue(1, 0).unwrap();
}
assert!(!wfq.is_throttled_for(1));
assert!(!wfq.is_suspended_for(1));
wfq.try_enqueue(1, 0).unwrap();
assert!(wfq.is_throttled_for(1));
assert!(wfq.is_suspended_for(1));
assert!(!wfq.is_throttled_for(2));
}
}