1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
//! ump server running in an async task.
use std::ops::ControlFlow;
use tokio::task::{self, JoinHandle};
use async_trait::async_trait;
use super::{channel, Client, ReplyContext};
/// Message processing trait for an async handler.
#[async_trait]
pub trait Handler<S, R, E, RV> {
/// Optional initialization callback.
///
/// This is called on the dispatcher task before the main message dispatch
/// loop is entered.
#[allow(unused_variables)]
fn init(&mut self, weak_client: ump::WeakClient<S, R, E>) {}
/// Message processing callback.
///
/// The callback must return `ControlFlow::Continue(())` to keep the
/// dispatcher loop going. Returning `ControlFlow::Break(RV)` will cause the
/// dispatcher loop to abort and returns the value in `RV` from the task.
async fn proc_req(
&mut self,
msg: S,
rctx: ReplyContext<R, E>
) -> ControlFlow<RV, ()>;
/// Optional termination callback.
///
/// This is called on the dispatcher task just after the main message
/// processing loop has been terminated.
///
/// The `rv` argument is set to the return value returned from the dispatcher
/// loop. It will be set to `Some()` value if a request handler returned
/// `ControlFlow::Break(RV)`. If will be set to `None` if the dispatch loop
/// terminated because the queue is empty and all of the linked clients have
/// been dropped.
///
/// The value returned from this callback is returned from the dispatcher
/// task when it is joined.
///
/// The default implementation simply returns the `rv` parameter.
fn term(&mut self, rv: Option<RV>) -> Option<RV> {
rv
}
}
/// Run a task which will process incoming messages from an ump server
/// end-point.
///
/// See top module's documentation for an overview of the [dispatch
/// loop](crate#dispatch-loop).
pub fn spawn<S, R, E, RV>(
mut handler: impl Handler<S, R, E, RV> + Send + 'static
) -> (Client<S, R, E>, JoinHandle<Option<RV>>)
where
S: 'static + Send,
R: 'static + Send,
E: 'static + Send,
RV: 'static + Send
{
let (server, client) = channel();
let weak_client = client.weak();
let jh = task::spawn(async move {
handler.init(weak_client);
let ret = loop {
let (msg, rctx) = match server.async_wait().await {
Ok(d) => d,
Err(_) => break None
};
match handler.proc_req(msg, rctx).await {
ControlFlow::Continue(_) => {}
ControlFlow::Break(rv) => break Some(rv)
}
};
handler.term(ret)
});
(client, jh)
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :