atomr_persistence/
alod.rs1use std::collections::BTreeMap;
4
5use parking_lot::Mutex;
6
7#[derive(Debug, Clone)]
8pub struct UnconfirmedDelivery<M> {
9 pub delivery_id: u64,
10 pub destination: String,
11 pub message: M,
12}
13
14pub struct AtLeastOnceDelivery<M: Clone + Send + 'static> {
15 inner: Mutex<Inner<M>>,
16 redeliver_interval_ms: u64,
17 warn_after_attempts: u32,
18 max_unconfirmed: usize,
19}
20
21struct Inner<M: Clone + Send + 'static> {
22 next_id: u64,
23 unconfirmed: BTreeMap<u64, (UnconfirmedDelivery<M>, u32)>,
24}
25
26impl<M: Clone + Send + 'static> AtLeastOnceDelivery<M> {
27 pub fn new(redeliver_interval_ms: u64, warn_after: u32, max_unconfirmed: usize) -> Self {
28 Self {
29 inner: Mutex::new(Inner { next_id: 0, unconfirmed: BTreeMap::new() }),
30 redeliver_interval_ms,
31 warn_after_attempts: warn_after,
32 max_unconfirmed,
33 }
34 }
35
36 pub fn deliver(&self, destination: impl Into<String>, message: M) -> Option<u64> {
37 let mut inner = self.inner.lock();
38 if inner.unconfirmed.len() >= self.max_unconfirmed {
39 return None;
40 }
41 inner.next_id += 1;
42 let id = inner.next_id;
43 inner.unconfirmed.insert(
44 id,
45 (UnconfirmedDelivery { delivery_id: id, destination: destination.into(), message }, 0),
46 );
47 Some(id)
48 }
49
50 pub fn confirm_delivery(&self, id: u64) -> bool {
51 self.inner.lock().unconfirmed.remove(&id).is_some()
52 }
53
54 pub fn redeliver(&self) -> Vec<UnconfirmedDelivery<M>> {
55 let mut inner = self.inner.lock();
56 let mut out = Vec::new();
57 for (_, (d, attempts)) in inner.unconfirmed.iter_mut() {
58 *attempts += 1;
59 out.push(d.clone());
60 }
61 out
62 }
63
64 pub fn warn_threshold(&self) -> u32 {
65 self.warn_after_attempts
66 }
67
68 pub fn redeliver_interval_ms(&self) -> u64 {
69 self.redeliver_interval_ms
70 }
71
72 pub fn unconfirmed_count(&self) -> usize {
73 self.inner.lock().unconfirmed.len()
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::*;
80
81 #[test]
82 fn confirms_remove_from_pending() {
83 let alod = AtLeastOnceDelivery::<String>::new(500, 5, 100);
84 let id = alod.deliver("dst", "hi".into()).unwrap();
85 assert_eq!(alod.unconfirmed_count(), 1);
86 assert!(alod.confirm_delivery(id));
87 assert_eq!(alod.unconfirmed_count(), 0);
88 }
89
90 #[test]
91 fn redeliver_yields_unconfirmed() {
92 let alod = AtLeastOnceDelivery::<u32>::new(500, 5, 100);
93 alod.deliver("a", 1);
94 alod.deliver("b", 2);
95 assert_eq!(alod.redeliver().len(), 2);
96 }
97}