atomr-persistence 0.3.0

Event sourcing for atomr — `Eventsourced` trait, recovery permitter, async snapshotter, persistent FSM, at-least-once delivery.
Documentation
//! At-least-once delivery.

use std::collections::BTreeMap;

use parking_lot::Mutex;

#[derive(Debug, Clone)]
pub struct UnconfirmedDelivery<M> {
    pub delivery_id: u64,
    pub destination: String,
    pub message: M,
}

pub struct AtLeastOnceDelivery<M: Clone + Send + 'static> {
    inner: Mutex<Inner<M>>,
    redeliver_interval_ms: u64,
    warn_after_attempts: u32,
    max_unconfirmed: usize,
}

struct Inner<M: Clone + Send + 'static> {
    next_id: u64,
    unconfirmed: BTreeMap<u64, (UnconfirmedDelivery<M>, u32)>,
}

impl<M: Clone + Send + 'static> AtLeastOnceDelivery<M> {
    pub fn new(redeliver_interval_ms: u64, warn_after: u32, max_unconfirmed: usize) -> Self {
        Self {
            inner: Mutex::new(Inner { next_id: 0, unconfirmed: BTreeMap::new() }),
            redeliver_interval_ms,
            warn_after_attempts: warn_after,
            max_unconfirmed,
        }
    }

    pub fn deliver(&self, destination: impl Into<String>, message: M) -> Option<u64> {
        let mut inner = self.inner.lock();
        if inner.unconfirmed.len() >= self.max_unconfirmed {
            return None;
        }
        inner.next_id += 1;
        let id = inner.next_id;
        inner.unconfirmed.insert(
            id,
            (UnconfirmedDelivery { delivery_id: id, destination: destination.into(), message }, 0),
        );
        Some(id)
    }

    pub fn confirm_delivery(&self, id: u64) -> bool {
        self.inner.lock().unconfirmed.remove(&id).is_some()
    }

    pub fn redeliver(&self) -> Vec<UnconfirmedDelivery<M>> {
        let mut inner = self.inner.lock();
        let mut out = Vec::new();
        for (_, (d, attempts)) in inner.unconfirmed.iter_mut() {
            *attempts += 1;
            out.push(d.clone());
        }
        out
    }

    pub fn warn_threshold(&self) -> u32 {
        self.warn_after_attempts
    }

    pub fn redeliver_interval_ms(&self) -> u64 {
        self.redeliver_interval_ms
    }

    pub fn unconfirmed_count(&self) -> usize {
        self.inner.lock().unconfirmed.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn confirms_remove_from_pending() {
        let alod = AtLeastOnceDelivery::<String>::new(500, 5, 100);
        let id = alod.deliver("dst", "hi".into()).unwrap();
        assert_eq!(alod.unconfirmed_count(), 1);
        assert!(alod.confirm_delivery(id));
        assert_eq!(alod.unconfirmed_count(), 0);
    }

    #[test]
    fn redeliver_yields_unconfirmed() {
        let alod = AtLeastOnceDelivery::<u32>::new(500, 5, 100);
        alod.deliver("a", 1);
        alod.deliver("b", 2);
        assert_eq!(alod.redeliver().len(), 2);
    }
}