buffer_trigger/buffer_trigger_async/general/
builder.rs1use super::{General, Locker};
2use lifetime_thread::Outer;
3use std::{fmt, time::Duration};
4use tokio::sync::{mpsc::channel, Mutex, RwLock};
5pub struct Builder<E, C, P>
7where
8 P: fmt::Debug,
9 E: fmt::Debug,
10 C: fmt::Debug,
11{
12 payload: Option<P>,
13 name: String,
14 consumer: fn(C),
16 max_len: usize,
18 interval: Option<Duration>,
20 get_len: fn(&Option<P>) -> usize,
22 incr_len: fn(&mut Option<P>),
23 clear_len: fn(&mut Option<P>),
24 get_container: fn(&mut Option<P>) -> &mut C,
25 accumulator: fn(&mut C, E),
27 get_and_clear_container: fn(&mut Option<P>) -> C,
29}
30
31impl<E, C, P> fmt::Debug for Builder<E, C, P>
32where
33 P: fmt::Debug + Sync + Send,
34 E: fmt::Debug + Sync + Send,
35 C: fmt::Debug + Sync + Send,
36{
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 write!(f, "name {}", self.name)
39 }
40}
41
42impl<E, C, P> Builder<E, C, P>
43where
44 P: fmt::Debug + Sync + Send,
45 E: fmt::Debug + Sync + Send,
46 C: fmt::Debug + Sync + Send,
47{
48 #[must_use]
50 pub fn builder() -> Self {
51 Self {
52 payload: None,
53 name: "anonymous".to_owned(),
54 get_len: |_| 1,
55 incr_len: |_| {},
56 clear_len: |_| {},
57 get_container: |_| panic!(),
58 accumulator: |_, _| {},
59 get_and_clear_container: |_| panic!(),
60 consumer: |_| {},
61 max_len: std::usize::MAX,
62 interval: None,
63 }
64 }
65
66 pub fn name(mut self, name: String) -> Self {
68 self.name = name;
69 self
70 }
71
72 pub fn get_len(mut self, get_len: fn(&Option<P>) -> usize) -> Self {
74 self.get_len = get_len;
75 self
76 }
77
78 pub fn incr_len(mut self, incr_len: fn(&mut Option<P>)) -> Self {
80 self.incr_len = incr_len;
81 self
82 }
83 pub fn clear_len(mut self, clear_len: fn(&mut Option<P>)) -> Self {
85 self.clear_len = clear_len;
86 self
87 }
88
89 pub fn get_container(mut self, get_container: fn(&mut Option<P>) -> &mut C) -> Self {
91 self.get_container = get_container;
92 self
93 }
94
95 pub fn get_and_clear_container(
97 mut self,
98 get_and_clear_container: fn(&mut Option<P>) -> C,
99 ) -> Self {
100 self.get_and_clear_container = get_and_clear_container;
101 self
102 }
103
104 pub fn accumulator(mut self, accumulator: fn(&mut C, E)) -> Self {
106 self.accumulator = accumulator;
107 self
108 }
109
110 pub fn consumer(mut self, consumer: fn(C)) -> Self {
112 self.consumer = consumer;
113 self
114 }
115
116 pub fn max_len(mut self, max_len: usize) -> Self {
118 self.max_len = max_len;
119 self
120 }
121
122 pub fn interval(mut self, interval: Duration) -> Self {
124 self.interval = Some(interval);
125 self
126 }
127
128 pub fn payload(mut self, payload: P) -> Self {
130 self.payload = Some(payload);
131 self
132 }
133
134 pub fn build(self) -> Outer<General<E, C, P>> {
136 let (sender, receiver) = channel(10);
137 let general = General {
138 name: self.name,
139 locker: RwLock::new(Locker {
140 get_len: self.get_len,
141 incr_len: self.incr_len,
142 clear_len: self.clear_len,
143 get_container: self.get_container,
144 get_and_clear_container: self.get_and_clear_container,
145 accumulator: self.accumulator,
146 clock: false,
147 payload: self.payload,
148 }),
149 consumer: self.consumer,
150 max_len: self.max_len,
151 interval: self.interval,
152 sender: Mutex::new(sender),
153 receiver: Mutex::new(receiver),
154 };
155 if self.interval.is_some() {
156 lifetime_thread::async_spawn(general, |inner| async move {
157 while let Some(g) = inner.get() {
158 g.listen_clock_trigger().await
159 }
160 })
161 } else {
162 lifetime_thread::spawn(general, |_| {})
163 }
164 }
165}