1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
use crate::{
err::Error,
rctx::{RCtxState, ReplyContext}
};
pub(crate) enum InnerMsgType<P, S, R, E> {
Put(P),
Request(S, swctx::SetCtx<R, RCtxState, E>)
}
/// Mesage operation types.
pub enum MsgType<P, S, R, E> {
/// A uni-directional message pass.
Put(P),
/// A message pass that expects a reply.
Request(S, ReplyContext<R, E>)
}
impl<P, S, R, E> From<InnerMsgType<P, S, R, E>> for MsgType<P, S, R, E> {
fn from(val: InnerMsgType<P, S, R, E>) -> MsgType<P, S, R, E> {
match val {
InnerMsgType::Put(msg) => MsgType::Put(msg),
InnerMsgType::Request(msg, irctx) => {
// Create an application reply context from the reply context in the
// queue Implicitly changes state of the reply context from
// Queued to Waiting
let rctx = ReplyContext::from(irctx);
MsgType::Request(msg, rctx)
}
}
}
}
/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.
#[repr(transparent)]
pub struct Server<P, S, R, E>(
pub(crate) sigq::Puller<InnerMsgType<P, S, R, E>>
);
impl<P, S, R, E> Server<P, S, R, E>
where
P: 'static + Send,
S: 'static + Send,
R: 'static + Send,
E: 'static + Send
{
/// Block and wait, indefinitely, for an incoming message from a
/// [`Client`](crate::Client).
///
/// Returns the message sent by the client and a reply context. The server
/// must call [`ReplyContext::reply()`] on the reply context to pass a return
/// value to the client.
pub fn wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;
Ok(msg.into())
}
/// Take next next message off queue or return `None` is queue is empty.
#[allow(clippy::type_complexity)]
pub fn try_pop(&self) -> Result<Option<MsgType<P, S, R, E>>, Error<E>> {
let msg = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;
if let Some(msg) = msg {
Ok(Some(msg.into()))
} else {
Ok(None)
}
}
/// Same as [`Server::wait()`], but for use in an `async` context.
pub async fn async_wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
let msg = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;
Ok(msg.into())
}
/// Returns a boolean indicating whether the queue is/was empty. This isn't
/// really useful unless used in very specific situations. It mostly exists
/// for test cases.
pub fn was_empty(&self) -> bool {
self.0.was_empty()
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :