1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
use actix::prelude::*;

use std::{collections::HashSet, hash::Hash, mem::replace, time::Duration, time::Instant};

type CallbackFn<T> = Box<dyn Fn(HashSet<T>) + Send>;

pub struct Aggregator<T: Eq + Hash> {
    buf: HashSet<T>,
    cb: CallbackFn<T>,
    debounce: Duration,
    max_delay: Duration,
    handle: Option<(SpawnHandle, Instant)>,
}

impl<T> Aggregator<T>
where
    T: Eq + Hash + Unpin + 'static,
{
    pub fn new(
        debounce: Duration,
        max_delay: Duration,
        callback_fn: CallbackFn<T>,
    ) -> Aggregator<T> {
        Self {
            buf: HashSet::new(),
            cb: callback_fn,
            debounce,
            max_delay,
            handle: None,
        }
    }

    pub fn extend(&mut self, payload: Vec<T>, ctx: &mut <Self as Actor>::Context) {
        self.buf.extend(payload);
        self.flush_later(ctx);
    }

    pub fn flush_later(&mut self, ctx: &mut <Self as Actor>::Context) {
        if let Some((handle, start)) = replace(&mut self.handle, None) {
            ctx.cancel_future(handle);
            if start.elapsed() >= self.max_delay {
                ctx.notify(AggregatorCmd::Flush);
            } else {
                self.handle = Some((ctx.notify_later(AggregatorCmd::Flush, self.debounce), start))
            }
        } else {
            self.handle = Some((
                ctx.notify_later(AggregatorCmd::Flush, self.debounce),
                Instant::now(),
            ));
        }
    }

    pub fn flush(&mut self) {
        self.handle.take();
        if !self.buf.is_empty() {
            (self.cb)(replace(&mut self.buf, HashSet::new()))
        }
    }
}

impl<T> Actor for Aggregator<T>
where
    T: Eq + Hash + Unpin + 'static,
{
    type Context = Context<Self>;
}

impl<T> Supervised for Aggregator<T>
where
    T: Eq + Hash + Unpin + 'static,
{
    fn restarting(&mut self, _: &mut Self::Context) {}
}

#[derive(Message)]
#[rtype(result = "()")]
pub enum AggregatorCmd<T: Eq + Hash> {
    NewData(Vec<T>),
    Flush,
}

impl<T> Handler<AggregatorCmd<T>> for Aggregator<T>
where
    T: Eq + Hash + Unpin + 'static,
{
    type Result = ();

    fn handle(&mut self, msg: AggregatorCmd<T>, ctx: &mut Self::Context) -> Self::Result {
        match msg {
            AggregatorCmd::NewData(payload) => self.extend(payload, ctx),
            AggregatorCmd::Flush => self.flush(),
        }
    }
}

#[cfg(test)]
mod tests {
    #[test]
    fn it_works() {
        assert_eq!(2 + 2, 4);
    }
}