buffer_trigger/buffer_trigger_async/general/
builder.rs

1use super::{General, Locker};
2use lifetime_thread::Outer;
3use std::{fmt, time::Duration};
4use tokio::sync::{mpsc::channel, Mutex, RwLock};
5/// general buffer trigger builer
6pub struct Builder<E, C, P>
7where
8    P: fmt::Debug,
9    E: fmt::Debug,
10    C: fmt::Debug,
11{
12    payload: Option<P>,
13    name: String,
14    /// The function executed after the trigger condition is met.
15    consumer: fn(C),
16    /// how many elements are exceeded
17    max_len: usize,
18    /// The maximum time to wait after an element is saved.
19    interval: Option<Duration>,
20    /// Number of container elements
21    get_len: fn(&Option<P>) -> usize,
22    incr_len: fn(&mut Option<P>),
23    clear_len: fn(&mut Option<P>),
24    get_container: fn(&mut Option<P>) -> &mut C,
25    /// accumulator function
26    accumulator: fn(&mut C, E),
27    /// get and clear container
28    get_and_clear_container: fn(&mut Option<P>) -> C,
29}
30
31impl<E, C, P> fmt::Debug for Builder<E, C, P>
32where
33    P: fmt::Debug + Sync + Send,
34    E: fmt::Debug + Sync + Send,
35    C: fmt::Debug + Sync + Send,
36{
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        write!(f, "name {}", self.name)
39    }
40}
41
42impl<E, C, P> Builder<E, C, P>
43where
44    P: fmt::Debug + Sync + Send,
45    E: fmt::Debug + Sync + Send,
46    C: fmt::Debug + Sync + Send,
47{
48    /// init
49    #[must_use]
50    pub fn builder() -> Self {
51        Self {
52            payload: None,
53            name: "anonymous".to_owned(),
54            get_len: |_| 1,
55            incr_len: |_| {},
56            clear_len: |_| {},
57            get_container: |_| panic!(),
58            accumulator: |_, _| {},
59            get_and_clear_container: |_| panic!(),
60            consumer: |_| {},
61            max_len: std::usize::MAX,
62            interval: None,
63        }
64    }
65
66    /// set `name`
67    pub fn name(mut self, name: String) -> Self {
68        self.name = name;
69        self
70    }
71
72    /// set `get_len`
73    pub fn get_len(mut self, get_len: fn(&Option<P>) -> usize) -> Self {
74        self.get_len = get_len;
75        self
76    }
77
78    /// set `incr_len`
79    pub fn incr_len(mut self, incr_len: fn(&mut Option<P>)) -> Self {
80        self.incr_len = incr_len;
81        self
82    }
83    /// set `incr_len`
84    pub fn clear_len(mut self, clear_len: fn(&mut Option<P>)) -> Self {
85        self.clear_len = clear_len;
86        self
87    }
88
89    /// set `get_container`
90    pub fn get_container(mut self, get_container: fn(&mut Option<P>) -> &mut C) -> Self {
91        self.get_container = get_container;
92        self
93    }
94
95    /// set `get_and_clear_container`
96    pub fn get_and_clear_container(
97        mut self,
98        get_and_clear_container: fn(&mut Option<P>) -> C,
99    ) -> Self {
100        self.get_and_clear_container = get_and_clear_container;
101        self
102    }
103
104    /// set `accumulator`
105    pub fn accumulator(mut self, accumulator: fn(&mut C, E)) -> Self {
106        self.accumulator = accumulator;
107        self
108    }
109
110    /// set `consumer`
111    pub fn consumer(mut self, consumer: fn(C)) -> Self {
112        self.consumer = consumer;
113        self
114    }
115
116    /// set `max_len`
117    pub fn max_len(mut self, max_len: usize) -> Self {
118        self.max_len = max_len;
119        self
120    }
121
122    /// set `interval`
123    pub fn interval(mut self, interval: Duration) -> Self {
124        self.interval = Some(interval);
125        self
126    }
127
128    /// set `interval`
129    pub fn payload(mut self, payload: P) -> Self {
130        self.payload = Some(payload);
131        self
132    }
133
134    /// `build`
135    pub fn build(self) -> Outer<General<E, C, P>> {
136        let (sender, receiver) = channel(10);
137        let general = General {
138            name: self.name,
139            locker: RwLock::new(Locker {
140                get_len: self.get_len,
141                incr_len: self.incr_len,
142                clear_len: self.clear_len,
143                get_container: self.get_container,
144                get_and_clear_container: self.get_and_clear_container,
145                accumulator: self.accumulator,
146                clock: false,
147                payload: self.payload,
148            }),
149            consumer: self.consumer,
150            max_len: self.max_len,
151            interval: self.interval,
152            sender: Mutex::new(sender),
153            receiver: Mutex::new(receiver),
154        };
155        if self.interval.is_some() {
156            lifetime_thread::async_spawn(general, |inner| async move {
157                while let Some(g) = inner.get() {
158                    g.listen_clock_trigger().await
159                }
160            })
161        } else {
162            lifetime_thread::spawn(general, |_| {})
163        }
164    }
165}