ump_server/
thread.rs

1//! ump server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_server::{
7//!   thread::{Handler, spawn},
8//!   ReplyContext, WeakClient
9//! };
10//!
11//! enum Request {
12//!   Add(usize, usize)
13//! }
14//! enum Reply {
15//!   Sum(usize)
16//! }
17//! #[derive(Debug)]
18//! enum MyError { }
19//!
20//! struct MyHandler {
21//!   wclnt: WeakClient<Request, Reply, MyError>
22//! };
23//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
24//!   fn proc_req(
25//!     &mut self,
26//!     msg: Request,
27//!     rctx: ReplyContext<Reply, MyError>
28//!   ) -> ControlFlow<(), ()> {
29//!     match msg {
30//!       Request::Add(a, b) => {
31//!         rctx.reply(Reply::Sum(a + b));
32//!         ControlFlow::Continue(())
33//!       }
34//!     }
35//!   }
36//! }
37//!
38//! let (clnt, jh) = spawn(|clnt| {
39//!   // Store a weak client in the handler so it doesn't keep the dispatch
40//!   // loop alive when the Client returned to the application is dropped.
41//!   Ok(MyHandler {
42//!     wclnt: clnt.weak()
43//!   })
44//! }).unwrap();
45//!
46//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
47//!   panic!("Unexpected reply");
48//! };
49//! assert_eq!(sum, 10);
50//!
51//! // Dropping the only client will terminate the dispatch loop
52//! drop(clnt);
53//!
54//! let _ = jh.join();
55//! ```
56
57use std::{ops::ControlFlow, thread};
58
59use super::{channel, Client, ReplyContext};
60
61/// Message processing trait for a threaded handler.
62pub trait Handler<S, R, E, RV> {
63  /// Optional initialization callback.
64  ///
65  /// This is called on the dispatcher thread before the main message
66  /// dispatch loop is entered.
67  #[allow(unused_variables)]
68  fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
69
70  /// Message processing callback.
71  ///
72  /// The callback must return `ControlFlow::Continue(())` to keep the
73  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
74  /// dispatcher loop to abort and returns the value in `RV` from the thread.
75  fn proc_req(
76    &mut self,
77    msg: S,
78    rctx: ReplyContext<R, E>
79  ) -> ControlFlow<RV, ()>;
80
81  /// Optional termination callback.
82  ///
83  /// This is called on the dispatcher thread just after the main message
84  /// processing loop has been terminated.
85  ///
86  /// The `rv` argument is set to the return value returned from the dispatcher
87  /// loop.  It will be set to `Some()` value if a request handler returned
88  /// `ControlFlow::Break(RV)`.  If will be set to `None` if the dispatch loop
89  /// terminated because the queue is empty and all of the linked clients have
90  /// been dropped.
91  ///
92  /// The value returned from this callback is returned from the dispatcher
93  /// thread when it is joined.
94  ///
95  /// The default implementation simply returns the `rv` parameter.
96  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
97    rv
98  }
99}
100
101/// Run a thread which will process incoming messages from an ump server
102/// end-point.
103///
104/// See top module's documentation for an overview of the [dispatch
105/// loop](crate#dispatch-loop).
106///
107/// # Errors
108/// An application-defined error `E` is returned if the dispatch loop is
109/// terminated by a handler returning `ControlFlow::Break(E)`.
110#[allow(clippy::type_complexity)]
111pub fn spawn<S, R, E, RV, F>(
112  hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
113) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E>
114where
115  S: 'static + Send,
116  R: 'static + Send,
117  E: 'static + Send,
118  RV: 'static + Send,
119  F: Handler<S, R, E, RV> + Send + 'static
120{
121  let (server, client) = channel();
122
123  let mut handler = hbldr(&client)?;
124
125  #[cfg(feature = "watchdog")]
126  let wdog = crate::wdog::run();
127
128  let weak_client = client.weak();
129  let jh = thread::spawn(move || {
130    handler.init(weak_client);
131    let ret = loop {
132      let (msg, rctx) = match server.wait() {
133        Ok(d) => d,
134        Err(_) => break None
135      };
136
137      #[cfg(feature = "watchdog")]
138      wdog.begin_process();
139
140      let res = handler.proc_req(msg, rctx);
141
142      #[cfg(feature = "watchdog")]
143      wdog.end_process();
144
145      match res {
146        ControlFlow::Continue(()) => {}
147        ControlFlow::Break(rv) => break Some(rv)
148      }
149    };
150
151    #[cfg(feature = "watchdog")]
152    let _ = wdog.kill();
153
154    handler.term(ret)
155  });
156
157  Ok((client, jh))
158}
159
160// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :