ump_ng_server/
thread.rs

1//! ump-ng dispatch server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_ng_server::{
7//!   thread::{Handler, spawn},
8//!   ump_ng::{ReplyContext, WeakClient}
9//! };
10//!
11//! enum Post {
12//!   ShoutIntoVoid
13//! }
14//! enum Request {
15//!   Add(usize, usize)
16//! }
17//! enum Reply {
18//!   Sum(usize)
19//! }
20//! #[derive(Debug)]
21//! enum MyError { }
22//!
23//! struct MyHandler {
24//!   wclnt: WeakClient<Post, Request, Reply, MyError>
25//! }
26//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
27//!   fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
28//!     match msg {
29//!       Post::ShoutIntoVoid => {
30//!         // No reply .. but keep on trudging on
31//!         ControlFlow::Continue(())
32//!       }
33//!     }
34//!   }
35//!   fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
36//!     -> ControlFlow<(), ()> {
37//!     match msg {
38//!       Request::Add(a, b) => {
39//!         rctx.reply(Reply::Sum(a+b)).unwrap();
40//!         ControlFlow::Continue(())
41//!       }
42//!     }
43//!   }
44//! }
45//!
46//! let (clnt, jh) = spawn(|clnt| {
47//!   // Store a weak client in the handler so it doesn't keep the dispatch
48//!   // loop alive when the Client returned to the application is dropped.
49//!   Ok(MyHandler {
50//!     wclnt: clnt.weak()
51//!   })
52//! }).unwrap();
53//!
54//! clnt.post(Post::ShoutIntoVoid).unwrap();
55//!
56//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
57//!   panic!("Unexpected reply");
58//! };
59//! assert_eq!(sum, 10);
60//!
61//! // drop client to force dispatch loop to terminate
62//! drop(clnt);
63//!
64//! jh.join();
65//! ```
66
67use std::{
68  ops::ControlFlow,
69  thread::{self, JoinHandle}
70};
71
72use super::{channel, Client, MsgType, ReplyContext};
73
74/// Message processing trait for a threaded handler.
75pub trait Handler<P, S, R, E, RV> {
76  /// Optional initialization callback.
77  ///
78  /// This is called on the dispatcher thread before the main message
79  /// processing loop is entered.
80  #[allow(unused_variables)]
81  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
82
83  /// Post message processing callback.
84  ///
85  /// The callback must return `ControlFlow::Continue(())` to keep the
86  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
87  /// dispatcher loop to abort and returns the value in `RV` from the thread.
88  fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
89
90  /// Request message processing callback.
91  ///
92  /// The callback must return `ControlFlow::Continue(())` to keep the
93  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
94  /// dispatcher loop to abort and returns the value in `RV` from the thread.
95  fn req(&mut self, msg: S, rctx: ReplyContext<R, E>) -> ControlFlow<RV, ()>;
96
97  /// Optional termination callback.
98  ///
99  /// This is called on the dispatcher thread just after the main message
100  /// processing loop has been terminbated.
101  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
102    rv
103  }
104}
105
106/// Launch a thread that will process incoming messages from an ump-ng server
107/// end-point.
108///
109/// See top module's documentation for an overview of the [dispatch
110/// loop](crate#dispatch-loop).
111///
112/// # Errors
113/// An application-defined error `E` is returned if the dispatch loop is
114/// terminated by a handler returning `ControlFlow::Break(E)`.
115#[allow(clippy::type_complexity)]
116pub fn spawn<P, S, R, E, RV, F>(
117  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
118) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
119where
120  P: 'static + Send,
121  S: 'static + Send,
122  R: 'static + Send,
123  E: 'static + Send,
124  RV: 'static + Send,
125  F: Handler<P, S, R, E, RV> + Send + 'static
126{
127  let (server, client) = channel();
128
129  let mut handler = hbldr(&client)?;
130
131  #[cfg(feature = "watchdog")]
132  let wdog = crate::wdog::run();
133
134  let weak_client = client.weak();
135  let jh = thread::spawn(move || {
136    handler.init(weak_client);
137    let ret = loop {
138      match server.wait() {
139        Ok(msg) => {
140          #[cfg(feature = "watchdog")]
141          wdog.begin_process();
142
143          let res = match msg {
144            MsgType::Post(m) => handler.post(m),
145            MsgType::Request(m, rctx) => handler.req(m, rctx)
146          };
147
148          #[cfg(feature = "watchdog")]
149          wdog.end_process();
150
151          match res {
152            ControlFlow::Continue(()) => {}
153            ControlFlow::Break(rv) => break Some(rv)
154          }
155        }
156        Err(_) => break None
157      }
158    };
159
160    #[cfg(feature = "watchdog")]
161    let _ = wdog.kill();
162
163    handler.term(ret)
164  });
165
166  Ok((client, jh))
167}
168
169// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :