ump_server/thread.rs
1//! ump server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_server::{
7//! thread::{Handler, spawn},
8//! ReplyContext, WeakClient
9//! };
10//!
11//! enum Request {
12//! Add(usize, usize)
13//! }
14//! enum Reply {
15//! Sum(usize)
16//! }
17//! #[derive(Debug)]
18//! enum MyError { }
19//!
20//! struct MyHandler {
21//! wclnt: WeakClient<Request, Reply, MyError>
22//! };
23//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
24//! fn proc_req(
25//! &mut self,
26//! msg: Request,
27//! rctx: ReplyContext<Reply, MyError>
28//! ) -> ControlFlow<(), ()> {
29//! match msg {
30//! Request::Add(a, b) => {
31//! rctx.reply(Reply::Sum(a + b));
32//! ControlFlow::Continue(())
33//! }
34//! }
35//! }
36//! }
37//!
38//! let (clnt, jh) = spawn(|clnt| {
39//! // Store a weak client in the handler so it doesn't keep the dispatch
40//! // loop alive when the Client returned to the application is dropped.
41//! Ok(MyHandler {
42//! wclnt: clnt.weak()
43//! })
44//! }).unwrap();
45//!
46//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
47//! panic!("Unexpected reply");
48//! };
49//! assert_eq!(sum, 10);
50//!
51//! // Dropping the only client will terminate the dispatch loop
52//! drop(clnt);
53//!
54//! let _ = jh.join();
55//! ```
56
57use std::{ops::ControlFlow, thread};
58
59use super::{Client, ReplyContext, channel};
60
61/// Message processing trait for a threaded handler.
62///
63/// See top module documentation for more information about [application
64/// message handlers](crate#application-message-handlers).
65pub trait Handler<S, R, E, RV> {
66 /// Optional initialization callback.
67 ///
68 /// This is called on the dispatcher thread before the main message
69 /// dispatch loop is entered.
70 #[allow(unused_variables)]
71 fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
72
73 /// Message processing callback.
74 ///
75 /// The callback must return `ControlFlow::Continue(())` to keep the
76 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
77 /// dispatcher loop to abort and returns the value in `RV` from the thread.
78 fn proc_req(
79 &mut self,
80 msg: S,
81 rctx: ReplyContext<R, E>
82 ) -> ControlFlow<RV, ()>;
83
84 /// Optional termination callback.
85 ///
86 /// This is called on the dispatcher thread just after the main message
87 /// processing loop has been terminated.
88 ///
89 /// The `rv` argument is set to the return value returned from the dispatcher
90 /// loop. It will be set to `Some()` value if a request handler returned
91 /// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
92 /// terminated because the queue is empty and all of the linked clients have
93 /// been dropped.
94 ///
95 /// The value returned from this callback is returned from the dispatcher
96 /// thread when it is joined.
97 ///
98 /// The default implementation simply returns the `rv` parameter.
99 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
100 rv
101 }
102}
103
104/// Run a thread which will process incoming messages from an ump server
105/// end-point.
106///
107/// `hbldr` is a closure that should spawn a [`Handler`].
108///
109/// See top module's documentation for an overview of the [dispatch
110/// loop](crate#dispatch-loop) and what the [generic
111/// parameters](crate#generics) are for.
112///
113/// # Errors
114/// An application-defined error `E` is returned if the dispatch loop is
115/// terminated by a handler returning `ControlFlow::Break(E)`.
116#[allow(clippy::type_complexity)]
117pub fn spawn<S, R, E, RV, F>(
118 hbldr: impl FnOnce(&Client<S, R, E>) -> Result<F, E>
119) -> Result<(Client<S, R, E>, thread::JoinHandle<Option<RV>>), E>
120where
121 S: 'static + Send,
122 R: 'static + Send,
123 E: 'static + Send,
124 RV: 'static + Send,
125 F: Handler<S, R, E, RV> + Send + 'static
126{
127 let (server, client) = channel();
128
129 let mut handler = hbldr(&client)?;
130
131 #[cfg(feature = "watchdog")]
132 let wdog = crate::wdog::run();
133
134 let weak_client = client.weak();
135 let jh = thread::spawn(move || {
136 handler.init(weak_client);
137 let ret = loop {
138 let Ok((msg, rctx)) = server.wait() else {
139 break None;
140 };
141
142 #[cfg(feature = "watchdog")]
143 wdog.begin_process();
144
145 let res = handler.proc_req(msg, rctx);
146
147 #[cfg(feature = "watchdog")]
148 wdog.end_process();
149
150 match res {
151 ControlFlow::Continue(()) => {}
152 ControlFlow::Break(rv) => break Some(rv)
153 }
154 };
155
156 #[cfg(feature = "watchdog")]
157 let _ = wdog.kill();
158
159 handler.term(ret)
160 });
161
162 Ok((client, jh))
163}
164
165// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :