1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use crate::{
  err::Error,
  rctx::{RCtxState, ReplyContext}
};

pub(crate) enum InnerMsgType<P, S, R, E> {
  Put(P),
  Request(S, swctx::SetCtx<R, RCtxState, E>)
}

/// Mesage operation types.
pub enum MsgType<P, S, R, E> {
  /// A uni-directional message pass.
  Put(P),

  /// A message pass that expects a reply.
  Request(S, ReplyContext<R, E>)
}

impl<P, S, R, E> From<InnerMsgType<P, S, R, E>> for MsgType<P, S, R, E> {
  fn from(val: InnerMsgType<P, S, R, E>) -> MsgType<P, S, R, E> {
    match val {
      InnerMsgType::Put(msg) => MsgType::Put(msg),
      InnerMsgType::Request(msg, irctx) => {
        // Create an application reply context from the reply context in the
        // queue Implicitly changes state of the reply context from
        // Queued to Waiting
        let rctx = ReplyContext::from(irctx);

        MsgType::Request(msg, rctx)
      }
    }
  }
}

/// Representation of a server object.
///
/// Each instantiation of a [`Server`] object represents an end-point which
/// will be used to receive messages from connected [`Client`](crate::Client)
/// objects.
#[repr(transparent)]
pub struct Server<P, S, R, E>(
  pub(crate) sigq::Puller<InnerMsgType<P, S, R, E>>
);

impl<P, S, R, E> Server<P, S, R, E>
where
  P: 'static + Send,
  S: 'static + Send,
  R: 'static + Send,
  E: 'static + Send
{
  /// Block and wait, indefinitely, for an incoming message from a
  /// [`Client`](crate::Client).
  ///
  /// Returns the message sent by the client and a reply context.  The server
  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
  /// value to the client.
  pub fn wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
    let msg = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;

    Ok(msg.into())
  }

  /// Take next next message off queue or return `None` is queue is empty.
  #[allow(clippy::type_complexity)]
  pub fn try_pop(&self) -> Result<Option<MsgType<P, S, R, E>>, Error<E>> {
    let msg = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;
    if let Some(msg) = msg {
      Ok(Some(msg.into()))
    } else {
      Ok(None)
    }
  }

  /// Same as [`Server::wait()`], but for use in an `async` context.
  pub async fn async_wait(&self) -> Result<MsgType<P, S, R, E>, Error<E>> {
    let msg = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;

    Ok(msg.into())
  }

  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
  /// really useful unless used in very specific situations.  It mostly exists
  /// for test cases.
  pub fn was_empty(&self) -> bool {
    self.0.was_empty()
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :