buffer_trigger/buffer_trigger_async/general/
mod.rs1use std::{fmt, time::Duration};
2use tokio::{
3 sync::{
4 mpsc::{Receiver, Sender},
5 Mutex, RwLock,
6 },
7 time::sleep,
8};
9
10pub mod builder;
11struct Locker<E, C, P>
12where
13 P: fmt::Debug,
14 E: fmt::Debug,
15 C: fmt::Debug,
16{
17 payload: Option<P>,
18 clock: bool,
20 get_len: fn(&Option<P>) -> usize,
22
23 incr_len: fn(&mut Option<P>),
24
25 clear_len: fn(&mut Option<P>),
26
27 get_container: fn(&mut Option<P>) -> &mut C,
28 accumulator: fn(&mut C, E),
30 get_and_clear_container: fn(&mut Option<P>) -> C,
32}
33
34pub struct General<E, C, P>
38where
39 P: fmt::Debug + Sync + Send + 'static,
40 E: fmt::Debug + Sync + Send + 'static,
41 C: fmt::Debug + Sync + Send + 'static,
42{
43 name: String,
44 locker: RwLock<Locker<E, C, P>>,
45 consumer: fn(C),
47 max_len: usize,
49 interval: Option<Duration>,
51 sender: Mutex<Sender<()>>,
52 receiver: Mutex<Receiver<()>>,
53}
54
55impl<E, C, P> fmt::Debug for General<E, C, P>
56where
57 P: fmt::Debug + Sync + Send,
58 E: fmt::Debug + Sync + Send,
59 C: fmt::Debug + Sync + Send,
60{
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "name {}", self.name)
63 }
64}
65
66impl<E, C, P> General<E, C, P>
67where
68 P: fmt::Debug + Sync + Send,
69 E: fmt::Debug + Sync + Send,
70 C: fmt::Debug + Sync + Send,
71{
72 pub async fn len(&self) -> usize {
73 let c = self.locker.read().await;
74 (c.get_len)(&c.payload)
75 }
76 pub async fn push(&self, value: E) {
77 {
78 let mut c = self.locker.write().await;
79 (c.incr_len)(&mut c.payload);
80 (c.accumulator)((c.get_container)(&mut c.payload), value);
81 if let (false, Some(dur)) = (c.clock, self.interval) {
82 c.clock = true;
83 let sender = self.sender.lock().await.clone();
84 let _ = tokio::spawn(async move {
85 sleep(dur).await;
86 sender.send(()).await
87 });
88 }
89 }
90 if self.len().await >= self.max_len {
91 self.trigger().await
92 }
93 }
94
95 pub async fn trigger(&self) {
96 if !self.is_empty().await {
97 let mut c = self.locker.write().await;
98 c.clock = false;
99 (c.clear_len)(&mut c.payload);
100 (self.consumer)((c.get_and_clear_container)(&mut c.payload));
101 }
102 }
103
104 pub async fn is_empty(&self) -> bool {
105 self.len().await == 0
106 }
107
108 pub async fn listen_clock_trigger(&self) {
110 log::info!("{:?} listen_clock_trigger", self);
111 while self.receiver.lock().await.recv().await.is_some() {
112 let clock = self.locker.read().await.clock;
113 if clock {
114 self.trigger().await;
115 }
116 }
117 }
118}
119
120impl<E, C, P> Drop for General<E, C, P>
121where
122 P: fmt::Debug + Sync + Send,
123 E: fmt::Debug + Sync + Send,
124 C: fmt::Debug + Sync + Send,
125{
126 fn drop(&mut self) {
127 let _ = self.trigger();
128 }
129}