buffer_trigger/buffer_trigger_sync/general/
builder.rs

1use super::{General, Locker};
2use lifetime_thread::Outer;
3use std::sync::{mpsc, Mutex, RwLock};
4use std::{fmt, time::Duration};
5/// general buffer trigger builer
6pub struct Builder<E, C, P>
7where
8    P: fmt::Debug + Send,
9    E: fmt::Debug + Send,
10    C: fmt::Debug + Send,
11{
12    payload: Option<P>,
13    name: String,
14    /// The function executed after the trigger condition is met.
15    consumer: fn(C),
16    /// how many elements are exceeded
17    max_len: usize,
18    /// The maximum time to wait after an element is saved.
19    interval: Option<Duration>,
20    /// Number of container elements
21    get_len: fn(&Option<P>) -> usize,
22    incr_len: fn(&mut Option<P>),
23    clear_len: fn(&mut Option<P>),
24    get_container: fn(&mut Option<P>) -> &mut C,
25    /// accumulator function
26    accumulator: fn(&mut C, E),
27    get_and_clear_container: fn(&mut Option<P>) -> C,
28}
29
30impl<E, C, P> fmt::Debug for Builder<E, C, P>
31where
32    P: fmt::Debug + Send,
33    E: fmt::Debug + Send,
34    C: fmt::Debug + Send,
35{
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        write!(f, "name {}", self.name)
38    }
39}
40
41impl<E, C, P> Builder<E, C, P>
42where
43    P: fmt::Debug + Send,
44    E: fmt::Debug + Send,
45    C: fmt::Debug + Send,
46{
47    /// init
48    #[must_use]
49    pub fn builder() -> Self {
50        Self {
51            payload: None,
52            name: "anonymous".to_owned(),
53            get_len: |_| 1,
54            incr_len: |_| {},
55            clear_len: |_| {},
56            get_container: |_| panic!(),
57            accumulator: |_, _| {},
58            get_and_clear_container: |_| panic!(),
59            consumer: |_| {},
60            max_len: std::usize::MAX,
61            interval: None,
62        }
63    }
64
65    /// set `name`
66    pub fn name(mut self, name: String) -> Self {
67        self.name = name;
68        self
69    }
70
71    /// set `get_len`
72    pub fn get_len(mut self, get_len: fn(&Option<P>) -> usize) -> Self {
73        self.get_len = get_len;
74        self
75    }
76
77    /// set `incr_len`
78    pub fn incr_len(mut self, incr_len: fn(&mut Option<P>)) -> Self {
79        self.incr_len = incr_len;
80        self
81    }
82    /// set `incr_len`
83    pub fn clear_len(mut self, clear_len: fn(&mut Option<P>)) -> Self {
84        self.clear_len = clear_len;
85        self
86    }
87
88    /// set `get_container`
89    pub fn get_container(mut self, get_container: fn(&mut Option<P>) -> &mut C) -> Self {
90        self.get_container = get_container;
91        self
92    }
93
94    /// set `get_and_clear_container`
95    pub fn get_and_clear_container(
96        mut self,
97        get_and_clear_container: fn(&mut Option<P>) -> C,
98    ) -> Self {
99        self.get_and_clear_container = get_and_clear_container;
100        self
101    }
102
103    /// set `accumulator`
104    pub fn accumulator(mut self, accumulator: fn(&mut C, E)) -> Self {
105        self.accumulator = accumulator;
106        self
107    }
108
109    /// set `consumer`
110    pub fn consumer(mut self, consumer: fn(C)) -> Self {
111        self.consumer = consumer;
112        self
113    }
114
115    /// set `max_len`
116    pub fn max_len(mut self, max_len: usize) -> Self {
117        self.max_len = max_len;
118        self
119    }
120
121    /// set `interval`
122    pub fn interval(mut self, interval: Duration) -> Self {
123        self.interval = Some(interval);
124        self
125    }
126
127    /// set `interval`
128    pub fn payload(mut self, payload: P) -> Self {
129        self.payload = Some(payload);
130        self
131    }
132
133    /// `build`
134    pub fn build(self) -> Outer<General<E, C, P>> {
135        let (sender, receiver) = mpsc::channel();
136        let general = General {
137            name: self.name,
138            locker: RwLock::new(Locker {
139                get_len: self.get_len,
140                incr_len: self.incr_len,
141                clear_len: self.clear_len,
142                get_container: self.get_container,
143                get_and_clear_container: self.get_and_clear_container,
144                accumulator: self.accumulator,
145                clock: false,
146                payload: self.payload,
147            }),
148            consumer: self.consumer,
149            max_len: self.max_len,
150            interval: self.interval,
151            sender: Mutex::new(sender),
152            receiver: Mutex::new(receiver),
153        };
154        if self.interval.is_some() {
155            lifetime_thread::spawn(general, |inner| {
156                while let Some(g) = inner.get() {
157                    g.listen_clock_trigger()
158                }
159            })
160        } else {
161            lifetime_thread::spawn(general, |_| {})
162        }
163    }
164}