Skip to main content

ump_ng_server/
thread.rs

1//! ump-ng dispatch server running on a thread.
2//!
3//! # Example
4//! ```
5//! use std::ops::ControlFlow;
6//! use ump_ng_server::{
7//!   thread::{Handler, spawn},
8//!   ump_ng::{ReplyContext, WeakClient}
9//! };
10//!
11//! enum Post {
12//!   ShoutIntoVoid
13//! }
14//! enum Request {
15//!   Add(usize, usize)
16//! }
17//! enum Reply {
18//!   Sum(usize)
19//! }
20//! #[derive(Debug)]
21//! enum MyError { }
22//!
23//! struct MyHandler {
24//!   wclnt: WeakClient<Post, Request, Reply, MyError>
25//! }
26//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
27//!   fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
28//!     match msg {
29//!       Post::ShoutIntoVoid => {
30//!         // No reply .. but keep on trudging on
31//!         ControlFlow::Continue(())
32//!       }
33//!     }
34//!   }
35//!   fn req(&mut self, msg: Request, rctx: ReplyContext<Reply, MyError>)
36//!     -> ControlFlow<(), ()> {
37//!     match msg {
38//!       Request::Add(a, b) => {
39//!         rctx.reply(Reply::Sum(a+b)).unwrap();
40//!         ControlFlow::Continue(())
41//!       }
42//!     }
43//!   }
44//! }
45//!
46//! let (clnt, jh) = spawn(|clnt| {
47//!   // Store a weak client in the handler so it doesn't keep the dispatch
48//!   // loop alive when the Client returned to the application is dropped.
49//!   Ok(MyHandler {
50//!     wclnt: clnt.weak()
51//!   })
52//! }).unwrap();
53//!
54//! clnt.post(Post::ShoutIntoVoid).unwrap();
55//!
56//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
57//!   panic!("Unexpected reply");
58//! };
59//! assert_eq!(sum, 10);
60//!
61//! // drop client to force dispatch loop to terminate
62//! drop(clnt);
63//!
64//! jh.join();
65//! ```
66
67use std::{
68  ops::ControlFlow,
69  thread::{self, JoinHandle}
70};
71
72use super::{Client, MsgType, ReplyContext, channel};
73
74/// Message processing trait for a threaded handler.
75///
76/// See top module documentation for more information about [application
77/// message handlers](crate#application-message-handlers).
78pub trait Handler<P, S, R, E, RV> {
79  /// Optional initialization callback.
80  ///
81  /// This is called on the dispatcher thread before the main message
82  /// processing loop is entered.
83  #[allow(unused_variables)]
84  fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
85
86  /// Post message processing callback.
87  ///
88  /// The callback must return `ControlFlow::Continue(())` to keep the
89  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
90  /// dispatcher loop to abort and returns the value in `RV` from the thread.
91  fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
92
93  /// Request message processing callback.
94  ///
95  /// The callback must return `ControlFlow::Continue(())` to keep the
96  /// dispatcher loop going.  Returning `ControlFlow::Break(RV)` will cause the
97  /// dispatcher loop to abort and returns the value in `RV` from the thread.
98  fn req(&mut self, msg: S, rctx: ReplyContext<R, E>) -> ControlFlow<RV, ()>;
99
100  /// Optional termination callback.
101  ///
102  /// This is called on the dispatcher thread just after the main message
103  /// processing loop has been terminbated.
104  fn term(&mut self, rv: Option<RV>) -> Option<RV> {
105    rv
106  }
107}
108
109/// Launch a thread that will process incoming messages from an ump-ng server
110/// end-point.
111///
112/// `hbldr` is a closure that is expected to return the handler object that
113/// will be responsible for processing received messages.
114///
115/// See top module's documentation for an overview of the [dispatch
116/// loop](crate#dispatch-loop) and what the [generic
117/// parameters](crate#generics) are for.
118///
119/// # Errors
120/// An application-defined error `E` is returned if the dispatch loop is
121/// terminated by a handler returning `ControlFlow::Break(E)`.
122#[allow(clippy::type_complexity)]
123pub fn spawn<P, S, R, E, RV, F>(
124  hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
125) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
126where
127  P: 'static + Send,
128  S: 'static + Send,
129  R: 'static + Send,
130  E: 'static + Send,
131  RV: 'static + Send,
132  F: Handler<P, S, R, E, RV> + Send + 'static
133{
134  let (server, client) = channel();
135
136  let mut handler = hbldr(&client)?;
137
138  #[cfg(feature = "watchdog")]
139  let wdog = crate::wdog::run();
140
141  let weak_client = client.weak();
142  let jh = thread::spawn(move || {
143    handler.init(weak_client);
144    let ret = loop {
145      match server.wait() {
146        Ok(msg) => {
147          #[cfg(feature = "watchdog")]
148          wdog.begin_process();
149
150          let res = match msg {
151            MsgType::Post(m) => handler.post(m),
152            MsgType::Request(m, rctx) => handler.req(m, rctx)
153          };
154
155          #[cfg(feature = "watchdog")]
156          wdog.end_process();
157
158          match res {
159            ControlFlow::Continue(()) => {}
160            ControlFlow::Break(rv) => break Some(rv)
161          }
162        }
163        Err(_) => break None
164      }
165    };
166
167    #[cfg(feature = "watchdog")]
168    let _ = wdog.kill();
169
170    handler.term(ret)
171  });
172
173  Ok((client, jh))
174}
175
176// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :