buffer_trigger/buffer_trigger_sync/general/
mod.rs1use super::BufferTrigger;
2use std::sync::{
3 mpsc::{Receiver, Sender},
4 Mutex, RwLock,
5};
6use std::thread;
7use std::{fmt, time::Duration};
8
9pub mod builder;
10struct Locker<E, C, P>
11where
12 P: fmt::Debug,
13 E: fmt::Debug,
14 C: fmt::Debug,
15{
16 payload: Option<P>,
17 clock: bool,
19 get_len: fn(&Option<P>) -> usize,
21
22 incr_len: fn(&mut Option<P>),
23
24 clear_len: fn(&mut Option<P>),
25
26 get_container: fn(&mut Option<P>) -> &mut C,
27 accumulator: fn(&mut C, E),
29 get_and_clear_container: fn(&mut Option<P>) -> C,
30}
31
32pub struct General<E, C, P>
36where
37 P: fmt::Debug + Send + 'static,
38 E: fmt::Debug + Send + 'static,
39 C: fmt::Debug + Send + 'static,
40{
41 name: String,
42 locker: RwLock<Locker<E, C, P>>,
43 consumer: fn(C),
45 max_len: usize,
47 interval: Option<Duration>,
49 sender: Mutex<Sender<()>>,
50 receiver: Mutex<Receiver<()>>,
51}
52
53impl<E, C, P> fmt::Debug for General<E, C, P>
54where
55 P: fmt::Debug + Send + 'static,
56 E: fmt::Debug + Send + 'static,
57 C: fmt::Debug + Send + 'static,
58{
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 write!(f, "name {}", self.name)
61 }
62}
63
64impl<E, C, P> super::BufferTrigger<E> for General<E, C, P>
65where
66 P: fmt::Debug + Send,
67 E: fmt::Debug + Send,
68 C: fmt::Debug + Send,
69{
70 fn len(&self) -> usize {
71 if let Ok(c) = self.locker.read() {
72 (c.get_len)(&c.payload)
73 } else {
74 0
75 }
76 }
77 fn push(&self, value: E) {
78 if let Ok(mut c) = self.locker.write() {
79 (c.incr_len)(&mut c.payload);
80 (c.accumulator)((c.get_container)(&mut c.payload), value);
81 if let (false, Some(dur)) = (c.clock, self.interval) {
82 c.clock = true;
83 match self.sender.lock() {
84 Ok(sender) => {
85 let sender = sender.clone();
86 let _ = thread::spawn(move || {
87 thread::sleep(dur);
88 if let Err(e) = sender.send(()) {
89 log::error!("auto clock trigger error {}", e);
90 };
91 });
92 }
93 Err(e) => {
94 log::error!("{}", e);
95 }
96 }
97 }
98 }
99 if self.len() >= self.max_len {
100 self.trigger()
101 }
102 }
103
104 fn trigger(&self) {
105 if !self.is_empty() {
106 if let Ok(mut c) = self.locker.write() {
107 c.clock = false;
108 (c.clear_len)(&mut c.payload);
109 (self.consumer)((c.get_and_clear_container)(&mut c.payload));
110 }
111 }
112 }
113
114 fn is_empty(&self) -> bool {
115 self.len() == 0
116 }
117}
118impl<E, C, P> General<E, C, P>
119where
120 P: fmt::Debug + Send,
121 E: fmt::Debug + Send,
122 C: fmt::Debug + Send,
123{
124 fn listen_clock_trigger(&self) {
126 log::info!("{:?} listen_clock_trigger", self);
127 while let Ok(recevier) = self.receiver.lock() {
128 if recevier.recv().is_ok() {
129 let clock = if let Ok(c) = self.locker.read() {
130 c.clock
131 } else {
132 false
133 };
134 if clock {
135 self.trigger();
136 }
137 }
138 }
139 }
140}
141impl<E, C, P> Drop for General<E, C, P>
142where
143 P: fmt::Debug + Send,
144 E: fmt::Debug + Send,
145 C: fmt::Debug + Send,
146{
147 fn drop(&mut self) {
148 self.trigger();
149 }
150}