ump_ng_server/thread.rs
1//! ump-ng dispatch server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_ng_server::{
7//! thread::{Handler, spawn},
8//! ump_ng::{ReplyContext, WeakClient}
9//! };
10//!
11//! enum Post {
12//! ShoutIntoVoid
13//! }
14//! enum Request {
15//! Add(usize, usize)
16//! }
17//! enum Reply {
18//! Sum(usize)
19//! }
20//! #[derive(Debug)]
21//! enum MyError { }
22//!
23//! struct MyHandler {
24//! wclnt: WeakClient<Post, Request, Reply, MyError>
25//! }
26//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
27//! fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
28//! match msg {
29//! Post::ShoutIntoVoid => {
30//! // No reply .. but keep on trudging on
31//! ControlFlow::Continue(())
32//! }
33//! }
34//! }
35//! fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
36//! -> ControlFlow<(), ()> {
37//! match msg {
38//! Request::Add(a, b) => {
39//! rctx.reply(Reply::Sum(a+b)).unwrap();
40//! ControlFlow::Continue(())
41//! }
42//! }
43//! }
44//! }
45//!
46//! let (clnt, jh) = spawn(|clnt| {
47//! // Store a weak client in the handler so it doesn't keep the dispatch
48//! // loop alive when the Client returned to the application is dropped.
49//! Ok(MyHandler {
50//! wclnt: clnt.weak()
51//! })
52//! }).unwrap();
53//!
54//! clnt.post(Post::ShoutIntoVoid).unwrap();
55//!
56//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
57//! panic!("Unexpected reply");
58//! };
59//! assert_eq!(sum, 10);
60//!
61//! // drop client to force dispatch loop to terminate
62//! drop(clnt);
63//!
64//! jh.join();
65//! ```
66
67use std::{
68 ops::ControlFlow,
69 thread::{self, JoinHandle}
70};
71
72use super::{Client, MsgType, ReplyContext, channel};
73
74/// Message processing trait for a threaded handler.
75///
76/// See top module documentation for more information about [application
77/// message handlers](crate#application-message-handlers).
78pub trait Handler<P, S, R, E, RV> {
79 /// Optional initialization callback.
80 ///
81 /// This is called on the dispatcher thread before the main message
82 /// processing loop is entered.
83 #[allow(unused_variables)]
84 fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
85
86 /// Post message processing callback.
87 ///
88 /// The callback must return `ControlFlow::Continue(())` to keep the
89 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
90 /// dispatcher loop to abort and returns the value in `RV` from the thread.
91 fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
92
93 /// Request message processing callback.
94 ///
95 /// The callback must return `ControlFlow::Continue(())` to keep the
96 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
97 /// dispatcher loop to abort and returns the value in `RV` from the thread.
98 fn req(&mut self, msg: S, rctx: ReplyContext<R, E>) -> ControlFlow<RV, ()>;
99
100 /// Optional termination callback.
101 ///
102 /// This is called on the dispatcher thread just after the main message
103 /// processing loop has been terminbated.
104 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
105 rv
106 }
107}
108
109/// Launch a thread that will process incoming messages from an ump-ng server
110/// end-point.
111///
112/// `hbldr` is a closure that is expected to return the handler object that
113/// will be responsible for processing received messages.
114///
115/// See top module's documentation for an overview of the [dispatch
116/// loop](crate#dispatch-loop) and what the [generic
117/// parameters](crate#generics) are for.
118///
119/// # Errors
120/// An application-defined error `E` is returned if the dispatch loop is
121/// terminated by a handler returning `ControlFlow::Break(E)`.
122#[allow(clippy::type_complexity)]
123pub fn spawn<P, S, R, E, RV, F>(
124 hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
125) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
126where
127 P: 'static + Send,
128 S: 'static + Send,
129 R: 'static + Send,
130 E: 'static + Send,
131 RV: 'static + Send,
132 F: Handler<P, S, R, E, RV> + Send + 'static
133{
134 let (server, client) = channel();
135
136 let mut handler = hbldr(&client)?;
137
138 #[cfg(feature = "watchdog")]
139 let wdog = crate::wdog::run();
140
141 let weak_client = client.weak();
142 let jh = thread::spawn(move || {
143 handler.init(weak_client);
144 let ret = loop {
145 match server.wait() {
146 Ok(msg) => {
147 #[cfg(feature = "watchdog")]
148 wdog.begin_process();
149
150 let res = match msg {
151 MsgType::Post(m) => handler.post(m),
152 MsgType::Request(m, rctx) => handler.req(m, rctx)
153 };
154
155 #[cfg(feature = "watchdog")]
156 wdog.end_process();
157
158 match res {
159 ControlFlow::Continue(()) => {}
160 ControlFlow::Break(rv) => break Some(rv)
161 }
162 }
163 Err(_) => break None
164 }
165 };
166
167 #[cfg(feature = "watchdog")]
168 let _ = wdog.kill();
169
170 handler.term(ret)
171 });
172
173 Ok((client, jh))
174}
175
176// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :