1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
use actix::prelude::*; use std::{collections::HashSet, hash::Hash, mem::replace, time::Duration, time::Instant}; type CallbackFn<T> = Box<dyn Fn(HashSet<T>) + Send>; pub struct Aggregator<T: Eq + Hash> { buf: HashSet<T>, cb: CallbackFn<T>, debounce: Duration, max_delay: Duration, handle: Option<(SpawnHandle, Instant)>, } impl<T> Aggregator<T> where T: Eq + Hash + Unpin + 'static, { pub fn new( debounce: Duration, max_delay: Duration, callback_fn: CallbackFn<T>, ) -> Aggregator<T> { Self { buf: HashSet::new(), cb: callback_fn, debounce, max_delay, handle: None, } } pub fn extend(&mut self, payload: Vec<T>, ctx: &mut <Self as Actor>::Context) { self.buf.extend(payload); self.flush_later(ctx); } pub fn flush_later(&mut self, ctx: &mut <Self as Actor>::Context) { if let Some((handle, start)) = replace(&mut self.handle, None) { ctx.cancel_future(handle); if start.elapsed() >= self.max_delay { ctx.notify(AggregatorCmd::Flush); } else { self.handle = Some((ctx.notify_later(AggregatorCmd::Flush, self.debounce), start)) } } else { self.handle = Some(( ctx.notify_later(AggregatorCmd::Flush, self.debounce), Instant::now(), )); } } pub fn flush(&mut self) { self.handle.take(); if !self.buf.is_empty() { (self.cb)(replace(&mut self.buf, HashSet::new())) } } } impl<T> Actor for Aggregator<T> where T: Eq + Hash + Unpin + 'static, { type Context = Context<Self>; } impl<T> Supervised for Aggregator<T> where T: Eq + Hash + Unpin + 'static, { fn restarting(&mut self, _: &mut Self::Context) {} } #[derive(Message)] #[rtype(result = "()")] pub enum AggregatorCmd<T: Eq + Hash> { NewData(Vec<T>), Flush, } impl<T> Handler<AggregatorCmd<T>> for Aggregator<T> where T: Eq + Hash + Unpin + 'static, { type Result = (); fn handle(&mut self, msg: AggregatorCmd<T>, ctx: &mut Self::Context) -> Self::Result { match msg { AggregatorCmd::NewData(payload) => self.extend(payload, ctx), AggregatorCmd::Flush => self.flush(), } } } #[cfg(test)] mod tests { #[test] fn it_works() { assert_eq!(2 + 2, 4); } }