ump_ng/client.rs
1use crate::{err::Error, rctx::RCtxState, server::InnerMsgType};
2
3
4/// Representation of a clonable client object.
5///
6/// Each instantiation of a `Client` object is itself an isolated client with
7/// regards to the server context. By cloning a client a new independent
8/// client is created. ("Independent" here meaning that it is still tied to
9/// the same server object, but the new client can be passed to a separate
10/// thread and can independently make calls to the server).
11#[repr(transparent)]
12pub struct Client<P, S, R, E>(
13 pub(crate) sigq::Pusher<InnerMsgType<P, S, R, E>>
14);
15
16impl<P, S, R, E> Client<P, S, R, E>
17where
18 P: 'static + Send,
19 R: 'static + Send,
20 E: 'static + Send
21{
22 /// Transmit a uni-directional message to the server end-point.
23 ///
24 /// # Errors
25 /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
26 /// end-point has been dropped.
27 pub fn post(&self, msg: P) -> Result<(), Error<E>> {
28 self
29 .0
30 .push(InnerMsgType::Post(msg))
31 .map_err(|_| Error::ServerDisappeared)?;
32
33 Ok(())
34 }
35
36 /// Send a message to the server, wait for a reply, and return the reply.
37 ///
38 /// A complete round-trip (the message is delivered to the server, and the
39 /// server sends a reply) must complete before this function returns
40 /// success.
41 ///
42 /// This method is _currently_ reentrant: It is safe to use share a single
43 /// `Client` among multiple threads. _This may change in the future_; it's
44 /// recommended to not rely on this. The recommended way to send messages to
45 /// a server from multiple threads is to clone the `Client` and assign one
46 /// separate `Client` to each thread.
47 ///
48 /// # Return
49 /// On success the function will return `Ok(msg)`.
50 ///
51 /// # Errors
52 /// If the linked server object has been released, or is released while the
53 /// message is in the server's queue, [`Error::ServerDisappeared`] will be
54 /// returned.
55 ///
56 /// If the server never replied to the message and the
57 /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will
58 /// be returned.
59 ///
60 /// If an application specific error occurs it will be returned as a
61 /// `Err(Error::App(E))`, where `E` is the error type used when creating the
62 /// [`channel`](crate::channel).
63 pub fn req(&self, out: S) -> Result<R, Error<E>> {
64 // Create a per-call reply context.
65 // This context could be created when the Client object is being created
66 // and stored in the context, and thus be reused for reach client call.
67 // One side-effect is that some of the state semantics becomes more
68 // complicated.
69 // The central repo has such an implementation checked in, but it seems to
70 // have some more corner cases that aren't properly handled.
71 let (sctx, wctx) = swctx::mkpair();
72
73 self
74 .0
75 .push(InnerMsgType::Request(out, sctx))
76 .map_err(|_| Error::ServerDisappeared)?;
77
78 Ok(wctx.wait()?)
79 }
80
81 /// Issue a request, immediately returning a context that is used to wait for
82 /// the server's reply.
83 ///
84 /// The `_async` naming is slightly misleading -- this method isn't an
85 /// `async` in a language/`Future` sense, but rather it doesn't block and
86 /// wait for a reply before returning. Instead it returns a [`WaitReply`]
87 /// object that is used to wait for the reply.
88 ///
89 /// This can be useful (in place of [`req()`](Client::req) or
90 /// [`areq()`](Client::areq()) methods) if the caller knows that the server
91 /// will take some time to respond to the request and the caller has other
92 /// tasks it can perform in the meantime.
93 ///
94 /// ```
95 /// use std::thread;
96 ///
97 /// use ump_ng::{channel, MsgType};
98 ///
99 /// let (server, client) = channel::<String, String, String, ()>();
100 ///
101 /// let server_thread = thread::spawn(move || {
102 /// // Wait for data to arrive from a client
103 /// println!("Server waiting for message ..");
104 /// let MsgType::Request(data, mut rctx) = server.wait().unwrap() else {
105 /// panic!("Unexpected message type");
106 /// };
107 ///
108 /// println!("Server received: '{}'", data);
109 ///
110 /// // Long processing of data from client
111 ///
112 /// // Reply to client
113 /// let reply = format!("Hello, {}!", data);
114 /// println!("Server replying '{}'", reply);
115 /// rctx.reply(reply);
116 ///
117 /// println!("Server done");
118 /// });
119 ///
120 /// let msg = String::from("Client");
121 /// println!("Client sending '{}'", msg);
122 /// let wrctx = client.req_async(msg).unwrap();
123 ///
124 /// // .. perform some operation while server is processing the request ..
125 ///
126 /// let reply = wrctx.wait().unwrap();
127 /// println!("Client received reply '{}'", reply);
128 /// println!("Client done");
129 ///
130 /// server_thread.join().unwrap();
131 /// ```
132 ///
133 /// # Errors
134 /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
135 /// end-point has been dropped.
136 pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
137 let (sctx, wctx) = swctx::mkpair();
138 self
139 .0
140 .push(InnerMsgType::Request(out, sctx))
141 .map_err(|_| Error::ServerDisappeared)?;
142
143 Ok(WaitReply(wctx))
144 }
145
146 /// Same as [`Client::req()`], but for use in `async` contexts.
147 #[allow(clippy::missing_errors_doc)]
148 pub async fn areq(&self, out: S) -> Result<R, Error<E>>
149 where
150 S: Send
151 {
152 let (sctx, wctx) = swctx::mkpair();
153
154 self
155 .0
156 .push(InnerMsgType::Request(out, sctx))
157 .map_err(|_| Error::ServerDisappeared)?;
158
159 Ok(wctx.wait_async().await?)
160 }
161
162 /// Create a weak `Client` reference.
163 #[must_use]
164 pub fn weak(&self) -> Weak<P, S, R, E> {
165 Weak(self.0.weak())
166 }
167
168 /// Create a special-purpose client that can only perform post operations.
169 #[must_use]
170 pub fn postclient(&self) -> Post<P, S, R, E> {
171 Post(self.clone())
172 }
173}
174
175impl<P, S, R, E> Clone for Client<P, S, R, E> {
176 /// Clone a client.
177 ///
178 /// When a client is cloned the new object will be linked to the same server,
179 /// but in all other respects the clone is a completely independent client.
180 ///
181 /// This means that a cloned client can be passed to a new thread/task and
182 /// make new independent calls to the server without any risk of collision
183 /// between clone and the original client object.
184 fn clone(&self) -> Self {
185 Self(self.0.clone())
186 }
187}
188
189
190/// Context used to wait for a server to reply to a request.
191pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);
192
193impl<R, E> WaitReply<R, E> {
194 /// Block and wait for a reply.
195 ///
196 /// For use in non-`async` threads.
197 ///
198 /// # Errors
199 /// [`Error::ServerDisappeared`] means the linked server object has been
200 /// released.
201 ///
202 /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
203 /// handler it replies to the message, [`Error::NoReply`] will be returned.
204 ///
205 /// If an application specific error occurs it will be returned in
206 /// [`Error::App`].
207 pub fn wait(self) -> Result<R, Error<E>> {
208 Ok(self.0.wait()?)
209 }
210
211 /// Block and wait for a reply.
212 ///
213 /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
214 #[allow(clippy::missing_errors_doc)]
215 pub async fn wait_async(self) -> Result<R, Error<E>>
216 where
217 R: Send,
218 E: Send
219 {
220 Ok(self.0.wait_async().await?)
221 }
222}
223
224
225/// A weak client reference that can be upgraded to a [`Client`] as long as
226/// other `Client` objects till exist.
227#[repr(transparent)]
228pub struct Weak<P, S, R, E>(
229 pub(crate) sigq::WeakPusher<InnerMsgType<P, S, R, E>>
230);
231
232impl<P, S, R, E> Clone for Weak<P, S, R, E> {
233 fn clone(&self) -> Self {
234 Self(self.0.clone())
235 }
236}
237
238impl<P, S, R, E> Weak<P, S, R, E> {
239 #[must_use]
240 pub fn upgrade(&self) -> Option<Client<P, S, R, E>> {
241 self.0.upgrade().map(|x| Client(x))
242 }
243}
244
245
246/// Special purpose client end-point that can only issue `Post` requests.
247#[derive(Clone)]
248#[repr(transparent)]
249pub struct Post<P, S, R, E>(Client<P, S, R, E>);
250
251impl<P, S, R, E> Post<P, S, R, E>
252where
253 P: 'static + Send,
254 S: 'static + Send,
255 R: 'static + Send,
256 E: 'static + Send
257{
258 /// Transmit a uni-directional message to the server end-point.
259 ///
260 /// # Errors
261 /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
262 /// end-point has been dropped.
263 pub fn post(&self, msg: P) -> Result<(), Error<E>> {
264 self.0.post(msg)
265 }
266}
267
268// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :