1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use async_std::{
    sync::{Mutex, Receiver, RwLock, Sender},
    task,
};
use std::{fmt, time::Duration};

pub mod builder;
struct Locker<E, C, P>
where
    P: fmt::Debug,
    E: fmt::Debug,
    C: fmt::Debug,
{
    payload: Option<P>,
    /// Whether the timed task has been set
    clock: bool,
    /// Number of container elements
    get_len: fn(&Option<P>) -> usize,

    incr_len: fn(&mut Option<P>),

    clear_len: fn(&mut Option<P>),

    get_container: fn(&mut Option<P>) -> &mut C,
    /// accumulator function
    accumulator: fn(&mut C, E),
    /// get and clear container
    get_and_clear_container: fn(&mut Option<P>) -> C,
}

/// General `BufferTrigger`
///
/// Set your own container to store in the current service
pub struct General<E, C, P>
where
    P: fmt::Debug + Sync + Send,
    E: fmt::Debug + Sync + Send,
    C: fmt::Debug + Sync + Send,
{
    name: String,
    locker: RwLock<Locker<E, C, P>>,
    /// The function executed after the trigger condition is met.
    consumer: fn(C),
    /// how many elements are exceeded
    max_len: usize,
    /// The maximum time to wait after an element is saved.
    interval: Option<Duration>,
    sender: Mutex<Sender<()>>,
    receiver: Mutex<Receiver<()>>,
}

impl<E, C, P> fmt::Debug for General<E, C, P>
where
    P: fmt::Debug + Sync + Send,
    E: fmt::Debug + Sync + Send,
    C: fmt::Debug + Sync + Send,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "name {}", self.name)
    }
}

impl<E, C, P> General<E, C, P>
where
    P: fmt::Debug + Sync + Send,
    E: fmt::Debug + Sync + Send,
    C: fmt::Debug + Sync + Send,
{
    pub async fn len(&self) -> usize {
        let c = self.locker.read().await;
        (c.get_len)(&c.payload)
    }
    pub async fn push(&self, value: E) {
        {
            let mut c = self.locker.write().await;
            (c.incr_len)(&mut c.payload);
            (c.accumulator)((c.get_container)(&mut c.payload), value);
            if let (false, Some(dur)) = (c.clock, self.interval) {
                c.clock = true;
                let sender = self.sender.lock().await.clone();
                let _ = task::spawn(async move {
                    task::sleep(dur).await;
                    sender.send(()).await
                });
            }
        }
        if self.len().await >= self.max_len {
            self.trigger().await
        }
    }

    pub async fn trigger(&self) {
        if !self.is_empty().await {
            let mut c = self.locker.write().await;
            c.clock = false;
            (c.clear_len)(&mut c.payload);
            (self.consumer)((c.get_and_clear_container)(&mut c.payload));
        }
    }

    pub async fn is_empty(&self) -> bool {
        self.len().await == 0
    }

    /// start clock trigger listener
    pub async fn listen_clock_trigger(&self) {
        log::info!("{:?} listen_clock_trigger", self);
        while self.receiver.lock().await.recv().await.is_ok() {
            let clock = self.locker.read().await.clock;
            if clock {
                self.trigger().await;
            }
        }
    }
}

impl<E, C, P> Drop for General<E, C, P>
where
    P: fmt::Debug + Sync + Send,
    E: fmt::Debug + Sync + Send,
    C: fmt::Debug + Sync + Send,
{
    fn drop(&mut self) {
        let _ = self.trigger();
    }
}