1mod err;
5
6use std::{
7 sync::{Arc, Weak},
8 thread,
9 time::{Duration, Instant}
10};
11
12use ump_ng::{Client, WeakClient};
13
14use parking_lot::{Condvar, Mutex};
15
16pub use err::Error;
17
18
19pub enum Proc<P> {
21 PostReset(P),
23
24 PostRemove(P),
26
27 Remove
29}
30
31struct Record<I, P> {
33 ident: I,
34 dur: Duration,
35 at: Instant,
36 cb: Box<dyn FnMut() -> Proc<P> + Send>
37}
38
39
40struct Inner<I, P> {
41 shutdown: bool,
42 dirty: bool,
43 posters: Vec<Record<I, P>>
44}
45
46impl<I, P> Default for Inner<I, P> {
47 fn default() -> Self {
48 Self {
49 shutdown: false,
50 dirty: false,
51 posters: Vec::new()
52 }
53 }
54}
55
56impl<I, P> Inner<I, P> {
57 pub fn new() -> Mutex<Self> {
58 Mutex::new(Self::default())
59 }
60}
61
62
63struct Shared<I, P> {
64 inner: Mutex<Inner<I, P>>,
65 signal: Condvar
66}
67
68impl<I, P> Default for Shared<I, P> {
69 fn default() -> Self {
70 Self {
71 inner: Inner::new(),
72 signal: Condvar::new()
73 }
74 }
75}
76
77impl<I, P> Shared<I, P> {
78 fn new() -> Arc<Self> {
79 Arc::new(Self::default())
80 }
81}
82
83
84pub struct AutoPoster<I, P> {
88 sh: Weak<Shared<I, P>>,
89 jh: Option<thread::JoinHandle<Result<(), Error>>>
90}
91
92impl<I, P> AutoPoster<I, P>
93where
94 I: Send + 'static
95{
96 pub fn new<S, R, E>(
101 clnt: &Client<P, S, R, E>
102 ) -> Result<Self, std::io::Error>
103 where
104 P: Send + 'static,
105 S: Send + 'static,
106 R: Send + 'static,
107 E: Send + 'static
108 {
109 let sh = Shared::new();
110
111 let shc = Arc::clone(&sh);
113 let wclnt = clnt.weak();
114 let jh = thread::Builder::new()
115 .name("ump-ng autoposter".into())
116 .spawn(move || autoposter_thread(shc, wclnt))?;
117 Ok(Self {
118 sh: Arc::downgrade(&sh),
119 jh: Some(jh)
120 })
121 }
122
123 #[must_use]
125 pub fn controller(&self) -> AutoPostCtrl<I, P> {
126 AutoPostCtrl {
127 sh: Weak::clone(&self.sh)
128 }
129 }
130
131 pub const fn take_jh(
132 &mut self
133 ) -> Option<thread::JoinHandle<Result<(), Error>>> {
134 self.jh.take()
135 }
136}
137
138impl<I, P> Drop for AutoPoster<I, P> {
139 #[allow(clippy::significant_drop_tightening)]
140 fn drop(&mut self) {
141 if let Some(sh) = Weak::upgrade(&self.sh) {
142 let mut inner = sh.inner.lock();
144 inner.shutdown = true;
145 sh.signal.notify_one();
146 }
147
148 if let Some(jh) = self.jh.take() {
149 let _ = jh.join();
150 }
151 }
152}
153
154
155pub struct AutoPostCtrl<I, P> {
159 sh: Weak<Shared<I, P>>
160}
161
162impl<I, P> Clone for AutoPostCtrl<I, P> {
163 fn clone(&self) -> Self {
164 Self {
165 sh: Weak::clone(&self.sh)
166 }
167 }
168}
169
170impl<I, P> AutoPostCtrl<I, P>
171where
172 I: PartialEq
173{
174 #[allow(clippy::significant_drop_tightening)]
186 pub fn set(
187 &self,
188 ident: I,
189 dur: Duration,
190 cb: impl FnMut() -> Proc<P> + Send + 'static
191 ) -> Result<(), Error> {
192 let Some(sh) = Weak::upgrade(&self.sh) else {
193 return Err(Error::NotRunning);
194 };
195
196 let mut inner = sh.inner.lock();
197
198 if let Some(idx) = inner.posters.iter().position(|x| x.ident == ident) {
201 let n = unsafe { inner.posters.get_unchecked_mut(idx) };
203 n.dur = dur;
204 n.cb = Box::new(cb);
205 n.at = Instant::now() + dur;
206 } else {
207 inner.posters.push(Record {
208 ident,
209 dur,
210 at: Instant::now() + dur,
211 cb: Box::new(cb)
212 });
213 }
214 inner.dirty = true;
215 sh.signal.notify_one();
216
217 Ok(())
218 }
219
220 pub fn have(&self, ident: &I) -> Result<bool, Error> {
225 Weak::upgrade(&self.sh).map_or(Err(Error::NotRunning), |sh| {
226 let inner = sh.inner.lock();
227 Ok(inner.posters.iter().any(|x| x.ident == *ident))
228 })
229 }
230
231 #[allow(clippy::significant_drop_tightening)]
236 pub fn remove(&self, ident: &I) -> Result<(), Error> {
237 let Some(sh) = Weak::upgrade(&self.sh) else {
238 return Err(Error::NotRunning);
239 };
240
241 let mut inner = sh.inner.lock();
242 if let Some(idx) = inner.posters.iter().position(|x| x.ident == *ident) {
243 inner.posters.swap_remove(idx);
244 inner.dirty = true;
245 sh.signal.notify_one();
246 }
247
248 Ok(())
249 }
250}
251
252
253#[allow(clippy::significant_drop_tightening)]
254#[allow(clippy::needless_pass_by_value)]
255fn autoposter_thread<I, P, S, R, E>(
256 sh: Arc<Shared<I, P>>,
257 wclnt: WeakClient<P, S, R, E>
258) -> Result<(), Error>
259where
260 P: Send + 'static,
261 S: Send + 'static,
262 R: Send + 'static,
263 E: Send + 'static
264{
265 let mut inner = sh.inner.lock();
266
267 loop {
268 if inner.shutdown {
269 break Ok(());
271 }
272
273 if inner.dirty {
275 inner.posters.sort_unstable_by(|a, b| b.at.cmp(&a.at));
277 inner.dirty = false;
278 }
279
280 if let Some(mut n) = inner.posters.pop() {
282 if Instant::now() >= n.at {
283 match (n.cb)() {
284 Proc::PostReset(msg) => {
285 let Some(clnt) = wclnt.upgrade() else {
286 break Err(Error::ClientDisappeared);
288 };
289 if clnt.post(msg).is_err() {
290 break Err(Error::ServerDisappeared);
292 }
293
294 n.at = Instant::now() + n.dur;
296 inner.posters.push(n);
297
298 inner.dirty = true;
300 }
301 Proc::PostRemove(msg) => {
302 let Some(clnt) = wclnt.upgrade() else {
303 break Err(Error::ClientDisappeared);
305 };
306 if clnt.post(msg).is_err() {
307 break Err(Error::ServerDisappeared);
309 }
310 }
311 Proc::Remove => {
312 }
314 }
315 } else {
316 let at = n.at;
317 inner.posters.push(n);
318 sh.signal.wait_until(&mut inner, at);
320 }
321 } else {
322 sh.signal.wait(&mut inner);
325 }
326 }
327}
328
329