ump_server/thread.rs
1//! ump server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_server::{
7//! thread::{Handler, spawn},
8//! ReplyContext, WeakClient
9//! };
10//!
11//! enum Request {
12//! Add(usize, usize)
13//! }
14//! enum Reply {
15//! Sum(usize)
16//! }
17//! #[derive(Debug)]
18//! enum MyError { }
19//!
20//! struct MyHandler {
21//! wclnt: WeakClient<Request, Reply, MyError>
22//! };
23//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
24//! fn proc_req(
25//! &mut self,
26//! msg: Request,
27//! rctx: ReplyContext<Reply, MyError>
28//! ) -> ControlFlow<(), ()> {
29//! match msg {
30//! Request::Add(a, b) => {
31//! rctx.reply(Reply::Sum(a + b));
32//! ControlFlow::Continue(())
33//! }
34//! }
35//! }
36//! }
37//!
38//! let (clnt, jh) = spawn(|clnt| {
39//! // Store a weak client in the handler so it doesn't keep the dispatch
40//! // loop alive when the Client returned to the application is dropped.
41//! Ok(MyHandler {
42//! wclnt: clnt.weak()
43//! })
44//! }).unwrap();
45//!
46//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
47//! panic!("Unexpected reply");
48//! };
49//! assert_eq!(sum, 10);
50//!
51//! // Dropping the only client will terminate the dispatch loop
52//! drop(clnt);
53//!
54//! let _ = jh.join();
55//! ```
56
57use std::{ops::ControlFlow, thread};
58
59use super::{channel, Client, ReplyContext};
60
61/// Message processing trait for a threaded handler.
62pub trait Handler<S, R, E, RV> {
63 /// Optional initialization callback.
64 ///
65 /// This is called on the dispatcher thread before the main message
66 /// dispatch loop is entered.
67 #[allow(unused_variables)]
68 fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
69
70 /// Message processing callback.
71 ///
72 /// The callback must return `ControlFlow::Continue(())` to keep the
73 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
74 /// dispatcher loop to abort and returns the value in `RV` from the thread.
75 fn proc_req(
76 &mut self,
77 msg: S,
78 rctx: ReplyContext<R, E>
79 ) -> ControlFlow<RV, ()>;
80
81 /// Optional termination callback.
82 ///
83 /// This is called on the dispatcher thread just after the main message
84 /// processing loop has been terminated.
85 ///
86 /// The `rv` argument is set to the return value returned from the dispatcher
87 /// loop. It will be set to `Some()` value if a request handler returned
88 /// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
89 /// terminated because the queue is empty and all of the linked clients have
90 /// been dropped.
91 ///
92 /// The value returned from this callback is returned from the dispatcher
93 /// thread when it is joined.
94 ///
95 /// The default implementation simply returns the `rv` parameter.
96 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
97 rv
98 }
99}
100
101/// Run a thread which will process incoming messages from an ump server
102/// end-point.
103///
104/// See top module's documentation for an overview of the [dispatch
105/// loop](crate#dispatch-loop).
106///
107/// # Errors
108/// An application-defined error `E` is returned if the dispatch loop is
109/// terminated by a handler returning `ControlFlow::Break(E)`.
110#[allow(clippy::type_complexity)]
111pub fn spawn<S, R, E, RV, F>(
112 hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
113) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E>
114where
115 S: 'static + Send,
116 R: 'static + Send,
117 E: 'static + Send,
118 RV: 'static + Send,
119 F: Handler<S, R, E, RV> + Send + 'static
120{
121 let (server, client) = channel();
122
123 let mut handler = hbldr(&client)?;
124
125 #[cfg(feature = "watchdog")]
126 let wdog = crate::wdog::run();
127
128 let weak_client = client.weak();
129 let jh = thread::spawn(move || {
130 handler.init(weak_client);
131 let ret = loop {
132 let (msg, rctx) = match server.wait() {
133 Ok(d) => d,
134 Err(_) => break None
135 };
136
137 #[cfg(feature = "watchdog")]
138 wdog.begin_process();
139
140 let res = handler.proc_req(msg, rctx);
141
142 #[cfg(feature = "watchdog")]
143 wdog.end_process();
144
145 match res {
146 ControlFlow::Continue(()) => {}
147 ControlFlow::Break(rv) => break Some(rv)
148 }
149 };
150
151 #[cfg(feature = "watchdog")]
152 let _ = wdog.kill();
153
154 handler.term(ret)
155 });
156
157 Ok((client, jh))
158}
159
160// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :