Skip to main content

duroxide_cdb/
leases.rs

1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU32, Ordering};
3
4/// Assigns dispatch slots to concurrent dispatcher tasks.
5/// Each dispatcher calls `acquire_slots()` to get its partition of the keyspace.
6#[async_trait::async_trait]
7pub trait LeaseProvider: Send + Sync {
8    /// Get the dispatch slots assigned to this caller.
9    /// On first call, assigns slots. On subsequent calls, returns cached assignment.
10    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;
11
12    /// Release slots when a dispatcher shuts down.
13    async fn release_slots(&self, caller_id: u64);
14}
15
16/// In-memory lease provider for single-runtime deployments (Phase 1).
17/// Distributes 256 dispatch slots evenly across N dispatchers.
18pub struct InMemoryLeaseProvider {
19    total: u32,
20    next_index: AtomicU32,
21    assignments: DashMap<u64, Vec<u8>>,
22}
23
24impl InMemoryLeaseProvider {
25    pub fn new(total_dispatchers: u32) -> Self {
26        Self {
27            total: total_dispatchers.max(1),
28            next_index: AtomicU32::new(0),
29            assignments: DashMap::new(),
30        }
31    }
32}
33
34#[async_trait::async_trait]
35impl LeaseProvider for InMemoryLeaseProvider {
36    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
37        self.assignments
38            .entry(caller_id)
39            .or_insert_with(|| {
40                let index = self.next_index.fetch_add(1, Ordering::SeqCst) % self.total;
41                (0u16..256)
42                    .filter(|s| (*s as u32) % self.total == index)
43                    .map(|s| s as u8)
44                    .collect()
45            })
46            .clone()
47    }
48
49    async fn release_slots(&self, caller_id: u64) {
50        self.assignments.remove(&caller_id);
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57
58    #[tokio::test]
59    async fn test_slot_distribution_single_dispatcher() {
60        let lp = InMemoryLeaseProvider::new(1);
61        let slots = lp.acquire_slots(1).await;
62        assert_eq!(slots.len(), 256);
63    }
64
65    #[tokio::test]
66    async fn test_slot_distribution_three_dispatchers() {
67        let lp = InMemoryLeaseProvider::new(3);
68        let s0 = lp.acquire_slots(100).await;
69        let s1 = lp.acquire_slots(200).await;
70        let s2 = lp.acquire_slots(300).await;
71
72        // All 256 slots covered, no overlap
73        let mut all: Vec<u8> = Vec::new();
74        all.extend_from_slice(&s0);
75        all.extend_from_slice(&s1);
76        all.extend_from_slice(&s2);
77        all.sort();
78        all.dedup();
79        assert_eq!(all.len(), 256);
80
81        // Each dispatcher gets ~85-86 slots
82        assert!(s0.len() >= 85 && s0.len() <= 86);
83        assert!(s1.len() >= 85 && s1.len() <= 86);
84        assert!(s2.len() >= 85 && s2.len() <= 86);
85    }
86
87    #[tokio::test]
88    async fn test_same_caller_gets_same_slots() {
89        let lp = InMemoryLeaseProvider::new(2);
90        let first = lp.acquire_slots(42).await;
91        let second = lp.acquire_slots(42).await;
92        assert_eq!(first, second);
93    }
94
95    #[tokio::test]
96    async fn test_release_allows_reassignment() {
97        let lp = InMemoryLeaseProvider::new(2);
98        let first = lp.acquire_slots(42).await;
99        lp.release_slots(42).await;
100        // After release, a new caller gets the next index
101        let _other = lp.acquire_slots(99).await;
102        let second = lp.acquire_slots(42).await;
103        // The slot assignment may differ after release
104        assert!(!first.is_empty());
105        assert!(!second.is_empty());
106    }
107}