buffer_trigger/buffer_trigger_sync/general/
mod.rs

1use super::BufferTrigger;
2use std::sync::{
3    mpsc::{Receiver, Sender},
4    Mutex, RwLock,
5};
6use std::thread;
7use std::{fmt, time::Duration};
8
9pub mod builder;
10struct Locker<E, C, P>
11where
12    P: fmt::Debug,
13    E: fmt::Debug,
14    C: fmt::Debug,
15{
16    payload: Option<P>,
17    /// Whether the timed task has been set
18    clock: bool,
19    /// Number of container elements
20    get_len: fn(&Option<P>) -> usize,
21
22    incr_len: fn(&mut Option<P>),
23
24    clear_len: fn(&mut Option<P>),
25
26    get_container: fn(&mut Option<P>) -> &mut C,
27    /// accumulator function
28    accumulator: fn(&mut C, E),
29    get_and_clear_container: fn(&mut Option<P>) -> C,
30}
31
32/// General `BufferTrigger`
33///
34/// Set your own container to store in the current service
35pub struct General<E, C, P>
36where
37    P: fmt::Debug + Send + 'static,
38    E: fmt::Debug + Send + 'static,
39    C: fmt::Debug + Send + 'static,
40{
41    name: String,
42    locker: RwLock<Locker<E, C, P>>,
43    /// The function executed after the trigger condition is met.
44    consumer: fn(C),
45    /// how many elements are exceeded
46    max_len: usize,
47    /// The maximum time to wait after an element is saved.
48    interval: Option<Duration>,
49    sender: Mutex<Sender<()>>,
50    receiver: Mutex<Receiver<()>>,
51}
52
53impl<E, C, P> fmt::Debug for General<E, C, P>
54where
55    P: fmt::Debug + Send + 'static,
56    E: fmt::Debug + Send + 'static,
57    C: fmt::Debug + Send + 'static,
58{
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        write!(f, "name {}", self.name)
61    }
62}
63
64impl<E, C, P> super::BufferTrigger<E> for General<E, C, P>
65where
66    P: fmt::Debug + Send,
67    E: fmt::Debug + Send,
68    C: fmt::Debug + Send,
69{
70    fn len(&self) -> usize {
71        if let Ok(c) = self.locker.read() {
72            (c.get_len)(&c.payload)
73        } else {
74            0
75        }
76    }
77    fn push(&self, value: E) {
78        if let Ok(mut c) = self.locker.write() {
79            (c.incr_len)(&mut c.payload);
80            (c.accumulator)((c.get_container)(&mut c.payload), value);
81            if let (false, Some(dur)) = (c.clock, self.interval) {
82                c.clock = true;
83                match self.sender.lock() {
84                    Ok(sender) => {
85                        let sender = sender.clone();
86                        let _ = thread::spawn(move || {
87                            thread::sleep(dur);
88                            if let Err(e) = sender.send(()) {
89                                log::error!("auto clock trigger error {}", e);
90                            };
91                        });
92                    }
93                    Err(e) => {
94                        log::error!("{}", e);
95                    }
96                }
97            }
98        }
99        if self.len() >= self.max_len {
100            self.trigger()
101        }
102    }
103
104    fn trigger(&self) {
105        if !self.is_empty() {
106            if let Ok(mut c) = self.locker.write() {
107                c.clock = false;
108                (c.clear_len)(&mut c.payload);
109                (self.consumer)((c.get_and_clear_container)(&mut c.payload));
110            }
111        }
112    }
113
114    fn is_empty(&self) -> bool {
115        self.len() == 0
116    }
117}
118impl<E, C, P> General<E, C, P>
119where
120    P: fmt::Debug + Send,
121    E: fmt::Debug + Send,
122    C: fmt::Debug + Send,
123{
124    /// start clock trigger listener
125    fn listen_clock_trigger(&self) {
126        log::info!("{:?} listen_clock_trigger", self);
127        while let Ok(recevier) = self.receiver.lock() {
128            if recevier.recv().is_ok() {
129                let clock = if let Ok(c) = self.locker.read() {
130                    c.clock
131                } else {
132                    false
133                };
134                if clock {
135                    self.trigger();
136                }
137            }
138        }
139    }
140}
141impl<E, C, P> Drop for General<E, C, P>
142where
143    P: fmt::Debug + Send,
144    E: fmt::Debug + Send,
145    C: fmt::Debug + Send,
146{
147    fn drop(&mut self) {
148        self.trigger();
149    }
150}