ump/server.rs
1use crate::{
2 err::Error,
3 rctx::{RCtxState, ReplyContext}
4};
5
6pub struct QueueNode<S, R, E> {
7 /// Raw message being sent from the client to the server.
8 pub(crate) msg: S,
9
10 /// Keep track of data needed to share reply data.
11 pub(crate) reply: swctx::SetCtx<R, RCtxState, E>
12}
13
14/// Representation of a server object.
15///
16/// Each instantiation of a [`Server`] object represents an end-point which
17/// will be used to receive messages from connected [`Client`](crate::Client)
18/// objects.
19#[repr(transparent)]
20pub struct Server<S, R, E>(pub(crate) sigq::Puller<QueueNode<S, R, E>>);
21
22impl<S, R, E> Server<S, R, E>
23where
24 S: 'static + Send,
25 R: 'static + Send,
26 E: 'static + Send
27{
28 /// Block and wait, indefinitely, for an incoming message from a
29 /// [`Client`](crate::Client).
30 ///
31 /// Returns the message sent by the client and a reply context. The server
32 /// must call [`ReplyContext::reply()`] on the reply context to pass a return
33 /// value to the client.
34 ///
35 /// # Errors
36 /// `Err(Error::ClientsDisappeared)` indicates that the queue is empty and
37 /// all the client end-points have been dropped.
38 pub fn wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
39 let node = self.0.pop().map_err(|_| Error::ClientsDisappeared)?;
40
41 // Extract the data from the node
42 let msg = node.msg;
43
44 // Create an application reply context from the reply context in the queue
45 // Implicitly changes state of the reply context from Queued to Waiting
46 let rctx = ReplyContext::try_from(node.reply)?;
47
48 Ok((msg, rctx))
49 }
50
51 /// Take next next message off queue or return `None` is queue is empty.
52 ///
53 /// # Errors
54 /// [`Error::ClientsDisappeared`] indicates that the queue is empty and
55 /// all the client end-points have been dropped.
56 #[allow(clippy::type_complexity)]
57 pub fn try_pop(&self) -> Result<Option<(S, ReplyContext<R, E>)>, Error<E>> {
58 let node = self.0.try_pop().map_err(|_| Error::ClientsDisappeared)?;
59
60 if let Some(node) = node {
61 // Extract the data from the node
62 let msg = node.msg;
63
64 // Create an application reply context from the reply context in the
65 // queue Implicitly changes state of the reply context from Queued
66 // to Waiting
67 let rctx = ReplyContext::try_from(node.reply)?;
68
69 Ok(Some((msg, rctx)))
70 } else {
71 Ok(None)
72 }
73 }
74
75 /// Same as [`Server::wait()`], but for use in an `async` context.
76 #[allow(clippy::missing_errors_doc)]
77 pub async fn async_wait(&self) -> Result<(S, ReplyContext<R, E>), Error<E>> {
78 let node = self.0.apop().await.map_err(|_| Error::ClientsDisappeared)?;
79
80 // Extract the data from the node
81 let msg = node.msg;
82
83 // Create an application reply context from the reply context in the queue
84 // Implicitly changes state of the reply context from Queued to Waiting
85 let rctx = ReplyContext::try_from(node.reply)?;
86
87 Ok((msg, rctx))
88 }
89
90 /// Returns a boolean indicating whether the queue is/was empty. This isn't
91 /// really useful unless used in very specific situations. It mostly exists
92 /// for test cases.
93 #[must_use]
94 pub fn was_empty(&self) -> bool {
95 self.0.was_empty()
96 }
97}
98
99// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :