ump_ng_server/thread.rs
1//! ump-ng dispatch server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_ng_server::{
7//! thread::{Handler, spawn},
8//! ump_ng::{ReplyContext, WeakClient}
9//! };
10//!
11//! enum Post {
12//! ShoutIntoVoid
13//! }
14//! enum Request {
15//! Add(usize, usize)
16//! }
17//! enum Reply {
18//! Sum(usize)
19//! }
20//! #[derive(Debug)]
21//! enum MyError { }
22//!
23//! struct MyHandler {
24//! wclnt: WeakClient<Post, Request, Reply, MyError>
25//! }
26//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
27//! fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
28//! match msg {
29//! Post::ShoutIntoVoid => {
30//! // No reply .. but keep on trudging on
31//! ControlFlow::Continue(())
32//! }
33//! }
34//! }
35//! fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
36//! -> ControlFlow<(), ()> {
37//! match msg {
38//! Request::Add(a, b) => {
39//! rctx.reply(Reply::Sum(a+b)).unwrap();
40//! ControlFlow::Continue(())
41//! }
42//! }
43//! }
44//! }
45//!
46//! let (clnt, jh) = spawn(|clnt| {
47//! // Store a weak client in the handler so it doesn't keep the dispatch
48//! // loop alive when the Client returned to the application is dropped.
49//! Ok(MyHandler {
50//! wclnt: clnt.weak()
51//! })
52//! }).unwrap();
53//!
54//! clnt.post(Post::ShoutIntoVoid).unwrap();
55//!
56//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
57//! panic!("Unexpected reply");
58//! };
59//! assert_eq!(sum, 10);
60//!
61//! // drop client to force dispatch loop to terminate
62//! drop(clnt);
63//!
64//! jh.join();
65//! ```
66
67use std::{
68 ops::ControlFlow,
69 thread::{self, JoinHandle}
70};
71
72use super::{channel, Client, MsgType, ReplyContext};
73
74/// Message processing trait for a threaded handler.
75pub trait Handler<P, S, R, E, RV> {
76 /// Optional initialization callback.
77 ///
78 /// This is called on the dispatcher thread before the main message
79 /// processing loop is entered.
80 #[allow(unused_variables)]
81 fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
82
83 /// Post message processing callback.
84 ///
85 /// The callback must return `ControlFlow::Continue(())` to keep the
86 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
87 /// dispatcher loop to abort and returns the value in `RV` from the thread.
88 fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
89
90 /// Request message processing callback.
91 ///
92 /// The callback must return `ControlFlow::Continue(())` to keep the
93 /// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
94 /// dispatcher loop to abort and returns the value in `RV` from the thread.
95 fn req(&mut self, msg: S, rctx: ReplyContext<R, E>) -> ControlFlow<RV, ()>;
96
97 /// Optional termination callback.
98 ///
99 /// This is called on the dispatcher thread just after the main message
100 /// processing loop has been terminbated.
101 fn term(&mut self, rv: Option<RV>) -> Option<RV> {
102 rv
103 }
104}
105
106/// Launch a thread that will process incoming messages from an ump-ng server
107/// end-point.
108///
109/// See top module's documentation for an overview of the [dispatch
110/// loop](crate#dispatch-loop).
111///
112/// # Errors
113/// An application-defined error `E` is returned if the dispatch loop is
114/// terminated by a handler returning `ControlFlow::Break(E)`.
115#[allow(clippy::type_complexity)]
116pub fn spawn<P, S, R, E, RV, F>(
117 hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
118) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
119where
120 P: 'static + Send,
121 S: 'static + Send,
122 R: 'static + Send,
123 E: 'static + Send,
124 RV: 'static + Send,
125 F: Handler<P, S, R, E, RV> + Send + 'static
126{
127 let (server, client) = channel();
128
129 let mut handler = hbldr(&client)?;
130
131 #[cfg(feature = "watchdog")]
132 let wdog = crate::wdog::run();
133
134 let weak_client = client.weak();
135 let jh = thread::spawn(move || {
136 handler.init(weak_client);
137 let ret = loop {
138 match server.wait() {
139 Ok(msg) => {
140 #[cfg(feature = "watchdog")]
141 wdog.begin_process();
142
143 let res = match msg {
144 MsgType::Post(m) => handler.post(m),
145 MsgType::Request(m, rctx) => handler.req(m, rctx)
146 };
147
148 #[cfg(feature = "watchdog")]
149 wdog.end_process();
150
151 match res {
152 ControlFlow::Continue(()) => {}
153 ControlFlow::Break(rv) => break Some(rv)
154 }
155 }
156 Err(_) => break None
157 }
158 };
159
160 #[cfg(feature = "watchdog")]
161 let _ = wdog.kill();
162
163 handler.term(ret)
164 });
165
166 Ok((client, jh))
167}
168
169// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :