Skip to main content

ump_server/
thread.rs

1//! ump server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_server::{
7//!   thread::{Handler, spawn},
8//!   ReplyContext, WeakClient
9//! };
10//!
11//! enum Request {
12//!   Add(usize, usize)
13//! }
14//! enum Reply {
15//!   Sum(usize)
16//! }
17//! #[derive(Debug)]
18//! enum MyError { }
19//!
20//! struct MyHandler {
21//!   wclnt: WeakClient<Request, Reply, MyError>
22//! };
23//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
24//!   fn proc_req(
25//!     &mut self,
26//!     msg: Request,
27//!     rctx: ReplyContext<Reply, MyError>
28//!   ) -> ControlFlow<(), ()> {
29//!     match msg {
30//!       Request::Add(a, b) => {
31//!         rctx.reply(Reply::Sum(a + b));
32//!         ControlFlow::Continue(())
33//!       }
34//!     }
35//!   }
36//! }
37//!
38//! let (clnt, jh) = spawn(|clnt| {
39//!   // Store a weak client in the handler so it doesn't keep the dispatch
40//!   // loop alive when the Client returned to the application is dropped.
41//!   Ok(MyHandler {
42//!     wclnt: clnt.weak()
43//!   })
44//! }).unwrap();
45//!
46//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
47//!   panic!("Unexpected reply");
48//! };
49//! assert_eq!(sum, 10);
50//!
51//! // Dropping the only client will terminate the dispatch loop
52//! drop(clnt);
53//!
54//! let _ = jh.join();
55//! ```
56
57use std::{ops::ControlFlow, thread};
58
59use super::{Client, ReplyContext, channel};
60
61/// Message processing trait for a threaded handler.
62///
63/// See top module documentation for more information about [application
64/// message handlers](crate#application-message-handlers).
65pub trait Handler<S, R, E, RV> {
66  /// Optional initialization callback.
67  ///
68  /// This is called on the dispatcher thread before the main message
69  /// dispatch loop is entered.
70  #[allow(unused_variables)]
71  fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
72
73  /// Message processing callback.
74  ///
75  /// The callback must return `ControlFlow::Continue(())` to keep the
76  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
77  /// dispatcher loop to abort and returns the value in `RV` from the thread.
78  fn proc_req(
79    &mut self,
80    msg: S,
81    rctx: ReplyContext<R, E>
82  ) -> ControlFlow<RV, ()>;
83
84  /// Optional termination callback.
85  ///
86  /// This is called on the dispatcher thread just after the main message
87  /// processing loop has been terminated.
88  ///
89  /// The `rv` argument is set to the return value returned from the dispatcher
90  /// loop.  It will be set to `Some()` value if a request handler returned
91  /// `ControlFlow::Break(RV)`.  If will be set to `None` if the dispatch loop
92  /// terminated because the queue is empty and all of the linked clients have
93  /// been dropped.
94  ///
95  /// The value returned from this callback is returned from the dispatcher
96  /// thread when it is joined.
97  ///
98  /// The default implementation simply returns the `rv` parameter.
99  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
100    rv
101  }
102}
103
104/// Run a thread which will process incoming messages from an ump server
105/// end-point.
106///
107/// `hbldr` is a closure that should spawn a [`Handler`].
108///
109/// See top module's documentation for an overview of the [dispatch
110/// loop](crate#dispatch-loop) and what the [generic
111/// parameters](crate#generics) are for.
112///
113/// # Errors
114/// An application-defined error `E` is returned if the dispatch loop is
115/// terminated by a handler returning `ControlFlow::Break(E)`.
116#[allow(clippy::type_complexity)]
117pub fn spawn<S, R, E, RV, F>(
118  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
119) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E>
120where
121  S: 'static + Send,
122  R: 'static + Send,
123  E: 'static + Send,
124  RV: 'static + Send,
125  F: Handler<S, R, E, RV> + Send + 'static
126{
127  let (server, client) = channel();
128
129  let mut handler = hbldr(&client)?;
130
131  #[cfg(feature = "watchdog")]
132  let wdog = crate::wdog::run();
133
134  let weak_client = client.weak();
135  let jh = thread::spawn(move || {
136    handler.init(weak_client);
137    let ret = loop {
138      let Ok((msg, rctx)) = server.wait() else {
139        break None;
140      };
141
142      #[cfg(feature = "watchdog")]
143      wdog.begin_process();
144
145      let res = handler.proc_req(msg, rctx);
146
147      #[cfg(feature = "watchdog")]
148      wdog.end_process();
149
150      match res {
151        ControlFlow::Continue(()) => {}
152        ControlFlow::Break(rv) => break Some(rv)
153      }
154    };
155
156    #[cfg(feature = "watchdog")]
157    let _ = wdog.kill();
158
159    handler.term(ret)
160  });
161
162  Ok((client, jh))
163}
164
165// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :