1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
//! ump server running on a thread.
//!
//! ```
//! use std::ops::ControlFlow;
//! use ump_server::{
//! thread::{Handler, spawn},
//! ump::ReplyContext
//! };
//! enum Request {
//! Add(usize, usize)
//! }
//! enum Reply {
//! Sum(usize)
//! }
//! enum MyError { }
//! struct MyHandler {};
//! impl Handler<Request, Reply, MyError, ()> for MyHandler {
//! fn proc_req(
//! &mut self,
//! msg: Request,
//! rctx: ReplyContext<Reply, MyError>
//! ) -> ControlFlow<(), ()> {
//! match msg {
//! Request::Add(a, b) => {
//! rctx.reply(Reply::Sum(a + b));
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! MyHandler { }
//! });
//!
//! let Ok(Reply::Sum(sum)) = clnt.req(Request::Add(3, 7)) else {
//! panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // Dropping the only client will terminate the dispatch loop
//! drop(clnt);
//!
//! let _ = jh.join();
//! ```
use std::{ops::ControlFlow, thread};
use super::{channel, Client, ReplyContext};
/// Message processing trait for a threaded handler.
pub trait Handler<S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher thread before the main message
/// dispatch loop is entered.
#[allow(unused_variables)]
fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
/// Message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the thread.
fn proc_req(
&mut self,
msg: S,
rctx: ReplyContext<R, E>
) -> ControlFlow<RV, ()>;
/// Optional termination callback.
///
/// This is called on the dispatcher thread just after the main message
/// processing loop has been terminated.
///
/// The `rv` argument is set to the return value returned from the dispatcher
/// loop. It will be set to `Some()` value if a request handler returned
/// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
/// terminated because the queue is empty and all of the linked clients have
/// been dropped.
///
/// The value returned from this callback is returned from the dispatcher
/// thread when it is joined.
///
/// The default implementation simply returns the `rv` parameter.
fn term(&mut self, rv: Option<RV>) -> Option<RV> {
rv
}
}
/// Run a thread which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV, F>(
hbldr: impl FnOnce(&Client<S, R, E>) -> F
) -> (Client<S, R, E>, thread::JoinHandle<Option<RV>>)
where
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send,
F: Handler<S, R, E, RV> + Send + 'static
{
let (server, client) = channel();
let mut handler = hbldr(&client);
let weak_client = client.weak();
let jh = thread::spawn(move || {
handler.init(weak_client);
let ret = loop {
let (msg, rctx) = match server.wait() {
Ok(d) => d,
Err(_) => break None
};
match handler.proc_req(msg, rctx) {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
};
handler.term(ret)
});
(client, jh)
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :