1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
//! ump-ng dispatch server running on a thread.
//!
//! # Example
//! ```
//! # tokio_test::block_on(async {
//! use std::ops::ControlFlow;
//! use ump_ng_server::{
//! async_trait,
//! task::{Handler, spawn},
//! ump_ng::{ReplyContext, WeakClient}
//! };
//!
//! enum Post {
//! ShoutIntoVoid
//! }
//! enum Request {
//! Add(usize, usize)
//! }
//! enum Reply {
//! Sum(usize)
//! }
//! #[derive(Debug)]
//! enum MyError { }
//!
//! struct MyHandler {
//! wclnt: WeakClient<Post, Request, Reply, MyError>
//! }
//! #[async_trait]
//! impl Handler<Post, Request, Reply, MyError, ()> for MyHandler {
//! async fn post(&mut self, msg: Post) -> ControlFlow<(), ()> {
//! match msg {
//! Post::ShoutIntoVoid => {
//! // No reply .. but keep on trudging on
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! async fn req(
//! &mut self,
//! msg: Request,
//! rctx: ReplyContext<Reply, MyError>
//! ) -> ControlFlow<(), ()> {
//! match msg {
//! Request::Add(a, b) => {
//! rctx.reply(Reply::Sum(a+b)).unwrap();
//! ControlFlow::Continue(())
//! }
//! }
//! }
//! }
//!
//! let (clnt, jh) = spawn(|clnt| {
//! // Store a weak client in the handler so it doesn't keep the dispatch
//! // loop alive when the Client returned to the application is dropped.
//! Ok(MyHandler {
//! wclnt: clnt.weak()
//! })
//! }).unwrap();
//!
//! clnt.post(Post::ShoutIntoVoid).unwrap();
//!
//! let Ok(Reply::Sum(sum)) = clnt.areq(Request::Add(3, 7)).await else {
//! panic!("Unexpected reply");
//! };
//! assert_eq!(sum, 10);
//!
//! // drop client to force dispatch loop to terminate
//! drop(clnt);
//!
//! jh.await;
//! # });
//! ```
use std::ops::ControlFlow;
use tokio::task::{self, JoinHandle};
use async_trait::async_trait;
use super::{channel, Client, MsgType, ReplyContext};
/// Message processing trait for an async handler.
#[async_trait]
pub trait Handler<P, S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher task before the main message
/// processing loop is entered.
#[allow(unused_variables)]
fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
/// Put message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the task.
async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
/// Request message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the task.
async fn req(
&mut self,
msg: S,
rctx: ReplyContext<R, E>
) -> ControlFlow<RV, ()>;
/// Optional termination callback.
///
/// This is called on the dispatcher task just after the main message
/// processing loop has been terminbated.
fn term(&mut self, rv: Option<RV>) -> Option<RV> {
rv
}
}
/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
#[allow(clippy::type_complexity)]
pub fn spawn<P, S, R, E, RV, F>(
hbldr: impl FnOnce(&Client<P, S, R, E>) -> Result<F, E>
) -> Result<(Client<P, S, R, E>, JoinHandle<Option<RV>>), E>
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send,
F: Handler<P, S, R, E, RV> + Send + 'static
{
let (server, client) = channel();
let mut handler = hbldr(&client)?;
#[cfg(feature = "watchdog")]
let wdog = crate::wdog::run();
let weak_client = client.weak();
let jh = task::spawn(async move {
handler.init(weak_client);
let ret = loop {
match server.async_wait().await {
Ok(msg) => {
#[cfg(feature = "watchdog")]
wdog.begin_process();
let res = match msg {
MsgType::Put(m) => handler.post(m).await,
MsgType::Request(m, rctx) => handler.req(m, rctx).await
};
#[cfg(feature = "watchdog")]
wdog.end_process();
match res {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
}
Err(_) => break None
}
};
#[cfg(feature = "watchdog")]
let _ = wdog.kill();
handler.term(ret)
});
Ok((client, jh))
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :