Skip to main content

atomr_persistence/
alod.rs

1//! At-least-once delivery.
2
3use std::collections::BTreeMap;
4
5use parking_lot::Mutex;
6
7#[derive(Debug, Clone)]
8pub struct UnconfirmedDelivery<M> {
9    pub delivery_id: u64,
10    pub destination: String,
11    pub message: M,
12}
13
14pub struct AtLeastOnceDelivery<M: Clone + Send + 'static> {
15    inner: Mutex<Inner<M>>,
16    redeliver_interval_ms: u64,
17    warn_after_attempts: u32,
18    max_unconfirmed: usize,
19}
20
21struct Inner<M: Clone + Send + 'static> {
22    next_id: u64,
23    unconfirmed: BTreeMap<u64, (UnconfirmedDelivery<M>, u32)>,
24}
25
26impl<M: Clone + Send + 'static> AtLeastOnceDelivery<M> {
27    pub fn new(redeliver_interval_ms: u64, warn_after: u32, max_unconfirmed: usize) -> Self {
28        Self {
29            inner: Mutex::new(Inner { next_id: 0, unconfirmed: BTreeMap::new() }),
30            redeliver_interval_ms,
31            warn_after_attempts: warn_after,
32            max_unconfirmed,
33        }
34    }
35
36    pub fn deliver(&self, destination: impl Into<String>, message: M) -> Option<u64> {
37        let mut inner = self.inner.lock();
38        if inner.unconfirmed.len() >= self.max_unconfirmed {
39            return None;
40        }
41        inner.next_id += 1;
42        let id = inner.next_id;
43        inner.unconfirmed.insert(
44            id,
45            (UnconfirmedDelivery { delivery_id: id, destination: destination.into(), message }, 0),
46        );
47        Some(id)
48    }
49
50    pub fn confirm_delivery(&self, id: u64) -> bool {
51        self.inner.lock().unconfirmed.remove(&id).is_some()
52    }
53
54    pub fn redeliver(&self) -> Vec<UnconfirmedDelivery<M>> {
55        let mut inner = self.inner.lock();
56        let mut out = Vec::new();
57        for (_, (d, attempts)) in inner.unconfirmed.iter_mut() {
58            *attempts += 1;
59            out.push(d.clone());
60        }
61        out
62    }
63
64    pub fn warn_threshold(&self) -> u32 {
65        self.warn_after_attempts
66    }
67
68    pub fn redeliver_interval_ms(&self) -> u64 {
69        self.redeliver_interval_ms
70    }
71
72    pub fn unconfirmed_count(&self) -> usize {
73        self.inner.lock().unconfirmed.len()
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn confirms_remove_from_pending() {
83        let alod = AtLeastOnceDelivery::<String>::new(500, 5, 100);
84        let id = alod.deliver("dst", "hi".into()).unwrap();
85        assert_eq!(alod.unconfirmed_count(), 1);
86        assert!(alod.confirm_delivery(id));
87        assert_eq!(alod.unconfirmed_count(), 0);
88    }
89
90    #[test]
91    fn redeliver_yields_unconfirmed() {
92        let alod = AtLeastOnceDelivery::<u32>::new(500, 5, 100);
93        alod.deliver("a", 1);
94        alod.deliver("b", 2);
95        assert_eq!(alod.redeliver().len(), 2);
96    }
97}