1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
use std::ops::ControlFlow;
use tokio::task::{self, JoinHandle};
use async_trait::async_trait;
use super::{channel, Client, MsgType, ReplyContext, Server};
#[async_trait]
pub trait Handler<P, S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher task before the main message
/// processing loop is entered.
#[allow(unused_variables)]
fn init(&mut self, weak_client: ump_ng::WeakClient<P, S, R, E>) {}
/// Put message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the task.
async fn post(&mut self, msg: P) -> ControlFlow<RV, ()>;
/// Request message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the task.
async fn req(
&mut self,
msg: S,
rctx: ReplyContext<R, E>
) -> ControlFlow<RV, ()>;
/// Optional termination callback.
///
/// This is called on the dispatcher task just after the main message
/// processing loop has been terminbated.
fn term(&mut self, rv: Option<RV>) -> Option<RV> {
rv
}
}
/// Launch a task that will process incoming messages from an ump-ng server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<P, S, R, E, RV>(
mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> (Client<P, S, R, E>, JoinHandle<Option<RV>>)
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send
{
let (server, client) = channel();
let weak_client = client.weak();
let jh = task::spawn(async move {
handler.init(weak_client);
let ret = loop {
match server.async_wait().await {
Ok(msg) => match msg {
MsgType::Put(m) => match handler.post(m).await {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
},
MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
},
Err(_) => break None
}
};
handler.term(ret)
});
(client, jh)
}
/// Spawn a task to run a pre-initialized handler.
///
/// It is assumed that the caller has initialized the handler, thus its
/// `init()` method will not be called.
pub fn spawn_preinit<P, S, R, E, RV>(
server: Server<P, S, R, E>,
mut handler: impl Handler<P, S, R, E, RV> + Send + 'static
) -> JoinHandle<Option<RV>>
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send
{
task::spawn(async move {
let ret = loop {
match server.async_wait().await {
Ok(msg) => match msg {
MsgType::Put(m) => match handler.post(m).await {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
},
MsgType::Request(m, rctx) => match handler.req(m, rctx).await {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
},
Err(_) => break None
}
};
handler.term(ret)
})
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :