buffer_trigger/buffer_trigger_async/general/
mod.rs

1use std::{fmt, time::Duration};
2use tokio::{
3    sync::{
4        mpsc::{Receiver, Sender},
5        Mutex, RwLock,
6    },
7    time::sleep,
8};
9
10pub mod builder;
11struct Locker<E, C, P>
12where
13    P: fmt::Debug,
14    E: fmt::Debug,
15    C: fmt::Debug,
16{
17    payload: Option<P>,
18    /// Whether the timed task has been set
19    clock: bool,
20    /// Number of container elements
21    get_len: fn(&Option<P>) -> usize,
22
23    incr_len: fn(&mut Option<P>),
24
25    clear_len: fn(&mut Option<P>),
26
27    get_container: fn(&mut Option<P>) -> &mut C,
28    /// accumulator function
29    accumulator: fn(&mut C, E),
30    /// get and clear container
31    get_and_clear_container: fn(&mut Option<P>) -> C,
32}
33
34/// General `BufferTrigger`
35///
36/// Set your own container to store in the current service
37pub struct General<E, C, P>
38where
39    P: fmt::Debug + Sync + Send + 'static,
40    E: fmt::Debug + Sync + Send + 'static,
41    C: fmt::Debug + Sync + Send + 'static,
42{
43    name: String,
44    locker: RwLock<Locker<E, C, P>>,
45    /// The function executed after the trigger condition is met.
46    consumer: fn(C),
47    /// how many elements are exceeded
48    max_len: usize,
49    /// The maximum time to wait after an element is saved.
50    interval: Option<Duration>,
51    sender: Mutex<Sender<()>>,
52    receiver: Mutex<Receiver<()>>,
53}
54
55impl<E, C, P> fmt::Debug for General<E, C, P>
56where
57    P: fmt::Debug + Sync + Send,
58    E: fmt::Debug + Sync + Send,
59    C: fmt::Debug + Sync + Send,
60{
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "name {}", self.name)
63    }
64}
65
66impl<E, C, P> General<E, C, P>
67where
68    P: fmt::Debug + Sync + Send,
69    E: fmt::Debug + Sync + Send,
70    C: fmt::Debug + Sync + Send,
71{
72    pub async fn len(&self) -> usize {
73        let c = self.locker.read().await;
74        (c.get_len)(&c.payload)
75    }
76    pub async fn push(&self, value: E) {
77        {
78            let mut c = self.locker.write().await;
79            (c.incr_len)(&mut c.payload);
80            (c.accumulator)((c.get_container)(&mut c.payload), value);
81            if let (false, Some(dur)) = (c.clock, self.interval) {
82                c.clock = true;
83                let sender = self.sender.lock().await.clone();
84                let _ = tokio::spawn(async move {
85                    sleep(dur).await;
86                    sender.send(()).await
87                });
88            }
89        }
90        if self.len().await >= self.max_len {
91            self.trigger().await
92        }
93    }
94
95    pub async fn trigger(&self) {
96        if !self.is_empty().await {
97            let mut c = self.locker.write().await;
98            c.clock = false;
99            (c.clear_len)(&mut c.payload);
100            (self.consumer)((c.get_and_clear_container)(&mut c.payload));
101        }
102    }
103
104    pub async fn is_empty(&self) -> bool {
105        self.len().await == 0
106    }
107
108    /// start clock trigger listener
109    pub async fn listen_clock_trigger(&self) {
110        log::info!("{:?} listen_clock_trigger", self);
111        while self.receiver.lock().await.recv().await.is_some() {
112            let clock = self.locker.read().await.clock;
113            if clock {
114                self.trigger().await;
115            }
116        }
117    }
118}
119
120impl<E, C, P> Drop for General<E, C, P>
121where
122    P: fmt::Debug + Sync + Send,
123    E: fmt::Debug + Sync + Send,
124    C: fmt::Debug + Sync + Send,
125{
126    fn drop(&mut self) {
127        let _ = self.trigger();
128    }
129}