duroxide-cdb 0.1.10

A CosmosDB-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
use dashmap::DashMap;
use std::sync::atomic::{AtomicU32, Ordering};

/// Assigns dispatch slots to concurrent dispatcher tasks.
/// Each dispatcher calls `acquire_slots()` to get its partition of the keyspace.
#[async_trait::async_trait]
pub trait LeaseProvider: Send + Sync {
    /// Get the dispatch slots assigned to this caller.
    /// On first call, assigns slots. On subsequent calls, returns cached assignment.
    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8>;

    /// Release slots when a dispatcher shuts down.
    async fn release_slots(&self, caller_id: u64);
}

/// In-memory lease provider for single-runtime deployments (Phase 1).
/// Distributes 256 dispatch slots evenly across N dispatchers.
pub struct InMemoryLeaseProvider {
    total: u32,
    next_index: AtomicU32,
    assignments: DashMap<u64, Vec<u8>>,
}

impl InMemoryLeaseProvider {
    pub fn new(total_dispatchers: u32) -> Self {
        Self {
            total: total_dispatchers.max(1),
            next_index: AtomicU32::new(0),
            assignments: DashMap::new(),
        }
    }
}

#[async_trait::async_trait]
impl LeaseProvider for InMemoryLeaseProvider {
    async fn acquire_slots(&self, caller_id: u64) -> Vec<u8> {
        self.assignments
            .entry(caller_id)
            .or_insert_with(|| {
                let index = self.next_index.fetch_add(1, Ordering::SeqCst) % self.total;
                (0u16..256)
                    .filter(|s| (*s as u32) % self.total == index)
                    .map(|s| s as u8)
                    .collect()
            })
            .clone()
    }

    async fn release_slots(&self, caller_id: u64) {
        self.assignments.remove(&caller_id);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_slot_distribution_single_dispatcher() {
        let lp = InMemoryLeaseProvider::new(1);
        let slots = lp.acquire_slots(1).await;
        assert_eq!(slots.len(), 256);
    }

    #[tokio::test]
    async fn test_slot_distribution_three_dispatchers() {
        let lp = InMemoryLeaseProvider::new(3);
        let s0 = lp.acquire_slots(100).await;
        let s1 = lp.acquire_slots(200).await;
        let s2 = lp.acquire_slots(300).await;

        // All 256 slots covered, no overlap
        let mut all: Vec<u8> = Vec::new();
        all.extend_from_slice(&s0);
        all.extend_from_slice(&s1);
        all.extend_from_slice(&s2);
        all.sort();
        all.dedup();
        assert_eq!(all.len(), 256);

        // Each dispatcher gets ~85-86 slots
        assert!(s0.len() >= 85 && s0.len() <= 86);
        assert!(s1.len() >= 85 && s1.len() <= 86);
        assert!(s2.len() >= 85 && s2.len() <= 86);
    }

    #[tokio::test]
    async fn test_same_caller_gets_same_slots() {
        let lp = InMemoryLeaseProvider::new(2);
        let first = lp.acquire_slots(42).await;
        let second = lp.acquire_slots(42).await;
        assert_eq!(first, second);
    }

    #[tokio::test]
    async fn test_release_allows_reassignment() {
        let lp = InMemoryLeaseProvider::new(2);
        let first = lp.acquire_slots(42).await;
        lp.release_slots(42).await;
        // After release, a new caller gets the next index
        let _other = lp.acquire_slots(99).await;
        let second = lp.acquire_slots(42).await;
        // The slot assignment may differ after release
        assert!(!first.is_empty());
        assert!(!second.is_empty());
    }
}