1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4#[async_trait::async_trait]
7pub trait LeaseProvider: Send + Sync {
8 async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;
11
12 async fn release_slots(&self, caller_id: u64);
14}
15
16pub struct InMemoryLeaseProvider {
19 total: u32,
20 next_index: AtomicU32,
21 assignments: DashMap<u64, Vec<u8>>,
22}
23
24impl InMemoryLeaseProvider {
25 pub fn new(total_dispatchers: u32) -> Self {
26 Self {
27 total: total_dispatchers.max(1),
28 next_index: AtomicU32::new(0),
29 assignments: DashMap::new(),
30 }
31 }
32}
33
34#[async_trait::async_trait]
35impl LeaseProvider for InMemoryLeaseProvider {
36 async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
37 self.assignments
38 .entry(caller_id)
39 .or_insert_with(|| {
40 let index = self.next_index.fetch_add(1, Ordering::SeqCst) % self.total;
41 (0u16..256)
42 .filter(|s| (*s as u32) % self.total == index)
43 .map(|s| s as u8)
44 .collect()
45 })
46 .clone()
47 }
48
49 async fn release_slots(&self, caller_id: u64) {
50 self.assignments.remove(&caller_id);
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use super::*;
57
58 #[tokio::test]
59 async fn test_slot_distribution_single_dispatcher() {
60 let lp = InMemoryLeaseProvider::new(1);
61 let slots = lp.acquire_slots(1).await;
62 assert_eq!(slots.len(), 256);
63 }
64
65 #[tokio::test]
66 async fn test_slot_distribution_three_dispatchers() {
67 let lp = InMemoryLeaseProvider::new(3);
68 let s0 = lp.acquire_slots(100).await;
69 let s1 = lp.acquire_slots(200).await;
70 let s2 = lp.acquire_slots(300).await;
71
72 let mut all: Vec<u8> = Vec::new();
74 all.extend_from_slice(&s0);
75 all.extend_from_slice(&s1);
76 all.extend_from_slice(&s2);
77 all.sort();
78 all.dedup();
79 assert_eq!(all.len(), 256);
80
81 assert!(s0.len() >= 85 && s0.len() <= 86);
83 assert!(s1.len() >= 85 && s1.len() <= 86);
84 assert!(s2.len() >= 85 && s2.len() <= 86);
85 }
86
87 #[tokio::test]
88 async fn test_same_caller_gets_same_slots() {
89 let lp = InMemoryLeaseProvider::new(2);
90 let first = lp.acquire_slots(42).await;
91 let second = lp.acquire_slots(42).await;
92 assert_eq!(first, second);
93 }
94
95 #[tokio::test]
96 async fn test_release_allows_reassignment() {
97 let lp = InMemoryLeaseProvider::new(2);
98 let first = lp.acquire_slots(42).await;
99 lp.release_slots(42).await;
100 let _other = lp.acquire_slots(99).await;
102 let second = lp.acquire_slots(42).await;
103 assert!(!first.is_empty());
105 assert!(!second.is_empty());
106 }
107}