ump/
server.rs

1use crate::{
2  err::Error,
3  rctx::{RCtxState, ReplyContext}
4};
5
6pub struct QueueNode<S, R, E> {
7  /// Raw message being sent from the client to the server.
8  pub(crate) msg: S,
9
10  /// Keep track of data needed to share reply data.
11  pub(crate) reply: swctx::SetCtx<R, RCtxState, E>
12}
13
14/// Representation of a server object.
15///
16/// Each instantiation of a [`Server`] object represents an end-point which
17/// will be used to receive messages from connected [`Client`](crate::Client)
18/// objects.
19#[repr(transparent)]
20pub struct Server<S, R, E>(pub(crate) sigq::Puller<QueueNode<S, R, E>>);
21
22impl<S, R, E> Server<S, R, E>
23where
24  S: 'static + Send,
25  R: 'static + Send,
26  E: 'static + Send
27{
28  /// Block and wait, indefinitely, for an incoming message from a
29  /// [`Client`](crate::Client).
30  ///
31  /// Returns the message sent by the client and a reply context.  The server
32  /// must call [`ReplyContext::reply()`] on the reply context to pass a return
33  /// value to the client.
34  ///
35  /// # Errors
36  /// `Err(Error::ClientsDisappeared)` indicates that the queue is empty and
37  /// all the client end-points have been dropped.
38  pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
39    let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;
40
41    // Extract the data from the node
42    let msg = node.msg;
43
44    // Create an application reply context from the reply context in the queue
45    // Implicitly changes state of the reply context from Queued to Waiting
46    let rctx = ReplyContext::try_from(node.reply)?;
47
48    Ok((msg, rctx))
49  }
50
51  /// Take next next message off queue or return `None` is queue is empty.
52  ///
53  /// # Errors
54  /// [`Error::ClientsDisappeared`] indicates that the queue is empty and
55  /// all the client end-points have been dropped.
56  #[allow(clippy::type_complexity)]
57  pub fn try_pop(&self) -> Result<Option<(S, ReplyContext<R, E>)>, Error<E>> {
58    let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;
59
60    if let Some(node) = node {
61      // Extract the data from the node
62      let msg = node.msg;
63
64      // Create an application reply context from the reply context in the
65      // queue Implicitly changes state of the reply context from Queued
66      // to Waiting
67      let rctx = ReplyContext::try_from(node.reply)?;
68
69      Ok(Some((msg, rctx)))
70    } else {
71      Ok(None)
72    }
73  }
74
75  /// Same as [`Server::wait()`], but for use in an `async` context.
76  #[allow(clippy::missing_errors_doc)]
77  pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
78    let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;
79
80    // Extract the data from the node
81    let msg = node.msg;
82
83    // Create an application reply context from the reply context in the queue
84    // Implicitly changes state of the reply context from Queued to Waiting
85    let rctx = ReplyContext::try_from(node.reply)?;
86
87    Ok((msg, rctx))
88  }
89
90  /// Returns a boolean indicating whether the queue is/was empty.  This isn't
91  /// really useful unless used in very specific situations.  It mostly exists
92  /// for test cases.
93  #[must_use]
94  pub fn was_empty(&self) -> bool {
95    self.0.was_empty()
96  }
97}
98
99// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :