use dashmap::DashMap;
use std::sync::atomic::{AtomicU32, Ordering};
#[async_trait::async_trait]
pub trait LeaseProvider: Send + Sync {
async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;
async fn release_slots(&self, caller_id: u64);
}
pub struct InMemoryLeaseProvider {
total: u32,
next_index: AtomicU32,
assignments: DashMap<u64, Vec<u8>>,
}
impl InMemoryLeaseProvider {
pub fn new(total_dispatchers: u32) -> Self {
Self {
total: total_dispatchers.max(1),
next_index: AtomicU32::new(0),
assignments: DashMap::new(),
}
}
}
#[async_trait::async_trait]
impl LeaseProvider for InMemoryLeaseProvider {
async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
self.assignments
.entry(caller_id)
.or_insert_with(|| {
let index = self.next_index.fetch_add(1, Ordering::SeqCst) % self.total;
(0u16..256)
.filter(|s| (*s as u32) % self.total == index)
.map(|s| s as u8)
.collect()
})
.clone()
}
async fn release_slots(&self, caller_id: u64) {
self.assignments.remove(&caller_id);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_slot_distribution_single_dispatcher() {
let lp = InMemoryLeaseProvider::new(1);
let slots = lp.acquire_slots(1).await;
assert_eq!(slots.len(), 256);
}
#[tokio::test]
async fn test_slot_distribution_three_dispatchers() {
let lp = InMemoryLeaseProvider::new(3);
let s0 = lp.acquire_slots(100).await;
let s1 = lp.acquire_slots(200).await;
let s2 = lp.acquire_slots(300).await;
let mut all: Vec<u8> = Vec::new();
all.extend_from_slice(&s0);
all.extend_from_slice(&s1);
all.extend_from_slice(&s2);
all.sort();
all.dedup();
assert_eq!(all.len(), 256);
assert!(s0.len() >= 85 && s0.len() <= 86);
assert!(s1.len() >= 85 && s1.len() <= 86);
assert!(s2.len() >= 85 && s2.len() <= 86);
}
#[tokio::test]
async fn test_same_caller_gets_same_slots() {
let lp = InMemoryLeaseProvider::new(2);
let first = lp.acquire_slots(42).await;
let second = lp.acquire_slots(42).await;
assert_eq!(first, second);
}
#[tokio::test]
async fn test_release_allows_reassignment() {
let lp = InMemoryLeaseProvider::new(2);
let first = lp.acquire_slots(42).await;
lp.release_slots(42).await;
let _other = lp.acquire_slots(99).await;
let second = lp.acquire_slots(42).await;
assert!(!first.is_empty());
assert!(!second.is_empty());
}
}