ump_ng_autopost/
lib.rs

1//! [`AutoPoster`] allos an application to asynchronously post messages to a
2//! ump-ng server (at regular intervals).
3
4mod err;
5
6use std::{
7  sync::{Arc, Weak},
8  thread,
9  time::{Duration, Instant}
10};
11
12use ump_ng::{Client, WeakClient};
13
14use parking_lot::{Condvar, Mutex};
15
16pub use err::Error;
17
18
19/// Post callback outcome.
20pub enum Proc<P> {
21  /// Post and reset timer.
22  PostReset(P),
23
24  /// Post and remove (i.e. don't rerun, unless reregistered).
25  PostRemove(P),
26
27  /// Remove without posting
28  Remove
29}
30
31/// Internal record used to keep track of a poster.
32struct Record<I, P> {
33  ident: I,
34  dur: Duration,
35  at: Instant,
36  cb: Box<dyn FnMut() -> Proc<P> + Send>
37}
38
39
40struct Inner<I, P> {
41  shutdown: bool,
42  dirty: bool,
43  posters: Vec<Record<I, P>>
44}
45
46impl<I, P> Default for Inner<I, P> {
47  fn default() -> Self {
48    Self {
49      shutdown: false,
50      dirty: false,
51      posters: Vec::new()
52    }
53  }
54}
55
56impl<I, P> Inner<I, P> {
57  pub fn new() -> Mutex<Self> {
58    Mutex::new(Self::default())
59  }
60}
61
62
63struct Shared<I, P> {
64  inner: Mutex<Inner<I, P>>,
65  signal: Condvar
66}
67
68impl<I, P> Default for Shared<I, P> {
69  fn default() -> Self {
70    Self {
71      inner: Inner::new(),
72      signal: Condvar::new()
73    }
74  }
75}
76
77impl<I, P> Shared<I, P> {
78  fn new() -> Arc<Self> {
79    Arc::new(Self::default())
80  }
81}
82
83
84/// Representation of a running auto-poster.
85///
86/// Once this is dropped, its worker thread will be terminated.
87pub struct AutoPoster<I, P> {
88  sh: Weak<Shared<I, P>>,
89  jh: Option<thread::JoinHandle<Result<(), Error>>>
90}
91
92impl<I, P> AutoPoster<I, P>
93where
94  I: Send + 'static
95{
96  /// Create a new `AutoPoster` instance.
97  ///
98  /// # Errors
99  /// If the backgound thread can not be spawned, `std::io::Error` is returned.
100  pub fn new<S, R, E>(
101    clnt: &Client<P, S, R, E>
102  ) -> Result<Self, std::io::Error>
103  where
104    P: Send + 'static,
105    S: Send + 'static,
106    R: Send + 'static,
107    E: Send + 'static
108  {
109    let sh = Shared::new();
110
111    // ToDo: autoclone
112    let shc = Arc::clone(&sh);
113    let wclnt = clnt.weak();
114    let jh = thread::Builder::new()
115      .name("ump-ng autoposter".into())
116      .spawn(move || autoposter_thread(shc, wclnt))?;
117    Ok(Self {
118      sh: Arc::downgrade(&sh),
119      jh: Some(jh)
120    })
121  }
122
123  /// Return a [`AutoPostCtrl`] that can be used to add/remove new autoposters.
124  #[must_use]
125  pub fn controller(&self) -> AutoPostCtrl<I, P> {
126    AutoPostCtrl {
127      sh: Weak::clone(&self.sh)
128    }
129  }
130
131  pub const fn take_jh(
132    &mut self
133  ) -> Option<thread::JoinHandle<Result<(), Error>>> {
134    self.jh.take()
135  }
136}
137
138impl<I, P> Drop for AutoPoster<I, P> {
139  #[allow(clippy::significant_drop_tightening)]
140  fn drop(&mut self) {
141    if let Some(sh) = Weak::upgrade(&self.sh) {
142      // Tell background thread to terminate
143      let mut inner = sh.inner.lock();
144      inner.shutdown = true;
145      sh.signal.notify_one();
146    }
147
148    if let Some(jh) = self.jh.take() {
149      let _ = jh.join();
150    }
151  }
152}
153
154
155/// Auto Post Controller.
156///
157/// Used to add/remove new poster callbacks.
158pub struct AutoPostCtrl<I, P> {
159  sh: Weak<Shared<I, P>>
160}
161
162impl<I, P> Clone for AutoPostCtrl<I, P> {
163  fn clone(&self) -> Self {
164    Self {
165      sh: Weak::clone(&self.sh)
166    }
167  }
168}
169
170impl<I, P> AutoPostCtrl<I, P>
171where
172  I: PartialEq
173{
174  /// Add/update an auto poster.
175  ///
176  /// `ident` is a unique identifier used to reference a specific auto-poster.
177  /// `dur` is the amount of time to wait before triggering the post.
178  /// `cb` is a callback closure that will be called to request the message to
179  /// be posted.
180  ///
181  /// The role of the closure is to return a [`Proc`]
182  ///
183  /// # Errors
184  /// [`Error::NotRunning`] means the background thread has been terminated.
185  #[allow(clippy::significant_drop_tightening)]
186  pub fn set(
187    &self,
188    ident: I,
189    dur: Duration,
190    cb: impl FnMut() -> Proc<P> + Send + 'static
191  ) -> Result<(), Error> {
192    let Some(sh) = Weak::upgrade(&self.sh) else {
193      return Err(Error::NotRunning);
194    };
195
196    let mut inner = sh.inner.lock();
197
198    // If ident already exists, then update existing record.  Otherwise just
199    // att it to the list.
200    if let Some(idx) = inner.posters.iter().position(|x| x.ident == ident) {
201      // SAFETY: We just found the entry
202      let n = unsafe { inner.posters.get_unchecked_mut(idx) };
203      n.dur = dur;
204      n.cb = Box::new(cb);
205      n.at = Instant::now() + dur;
206    } else {
207      inner.posters.push(Record {
208        ident,
209        dur,
210        at: Instant::now() + dur,
211        cb: Box::new(cb)
212      });
213    }
214    inner.dirty = true;
215    sh.signal.notify_one();
216
217    Ok(())
218  }
219
220  /// Check if an identifier is currently in use.
221  ///
222  /// # Errors
223  /// [`Error::NotRunning`] means the background thread has been terminated.
224  pub fn have(&self, ident: &I) -> Result<bool, Error> {
225    Weak::upgrade(&self.sh).map_or(Err(Error::NotRunning), |sh| {
226      let inner = sh.inner.lock();
227      Ok(inner.posters.iter().any(|x| x.ident == *ident))
228    })
229  }
230
231  /// Remove an auto-poster instance.
232  ///
233  /// # Errors
234  /// [`Error::NotRunning`] means the background thread has been terminated.
235  #[allow(clippy::significant_drop_tightening)]
236  pub fn remove(&self, ident: &I) -> Result<(), Error> {
237    let Some(sh) = Weak::upgrade(&self.sh) else {
238      return Err(Error::NotRunning);
239    };
240
241    let mut inner = sh.inner.lock();
242    if let Some(idx) = inner.posters.iter().position(|x| x.ident == *ident) {
243      inner.posters.swap_remove(idx);
244      inner.dirty = true;
245      sh.signal.notify_one();
246    }
247
248    Ok(())
249  }
250}
251
252
253#[allow(clippy::significant_drop_tightening)]
254#[allow(clippy::needless_pass_by_value)]
255fn autoposter_thread<I, P, S, R, E>(
256  sh: Arc<Shared<I, P>>,
257  wclnt: WeakClient<P, S, R, E>
258) -> Result<(), Error>
259where
260  P: Send + 'static,
261  S: Send + 'static,
262  R: Send + 'static,
263  E: Send + 'static
264{
265  let mut inner = sh.inner.lock();
266
267  loop {
268    if inner.shutdown {
269      //println!("shutdown");
270      break Ok(());
271    }
272
273    // If the list of posters is "dirty" then sort it
274    if inner.dirty {
275      // Sort list so that nearest-in-time are later in the list
276      inner.posters.sort_unstable_by(|a, b| b.at.cmp(&a.at));
277      inner.dirty = false;
278    }
279
280    // Take next one off the list
281    if let Some(mut n) = inner.posters.pop() {
282      if Instant::now() >= n.at {
283        match (n.cb)() {
284          Proc::PostReset(msg) => {
285            let Some(clnt) = wclnt.upgrade() else {
286              //println!("client disappeared");
287              break Err(Error::ClientDisappeared);
288            };
289            if clnt.post(msg).is_err() {
290              //println!("server disappeared");
291              break Err(Error::ServerDisappeared);
292            }
293
294            // Reset timer and push the node back on to the list
295            n.at = Instant::now() + n.dur;
296            inner.posters.push(n);
297
298            // Mark list as dirty
299            inner.dirty = true;
300          }
301          Proc::PostRemove(msg) => {
302            let Some(clnt) = wclnt.upgrade() else {
303              //println!("client disappeared");
304              break Err(Error::ClientDisappeared);
305            };
306            if clnt.post(msg).is_err() {
307              //println!("server disappeared");
308              break Err(Error::ServerDisappeared);
309            }
310          }
311          Proc::Remove => {
312            // Do nothing
313          }
314        }
315      } else {
316        let at = n.at;
317        inner.posters.push(n);
318        //println!("Wait until: {at:?}");
319        sh.signal.wait_until(&mut inner, at);
320      }
321    } else {
322      // Wait forever
323      //println!("Wait forever");
324      sh.signal.wait(&mut inner);
325    }
326  }
327}
328
329// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :