ump_ng/
client.rs

1use crate::{err::Error, rctx::RCtxState, server::InnerMsgType};
2
3
4/// Representation of a clonable client object.
5///
6/// Each instantiation of a `Client` object is itself an isolated client with
7/// regards to the server context.  By cloning a client a new independent
8/// client is created.  ("Independent" here meaning that it is still tied to
9/// the same server object, but the new client can be passed to a separate
10/// thread and can independently make calls to the server).
11#[repr(transparent)]
12pub struct Client<P, S, R, E>(
13  pub(crate) sigq::Pusher<InnerMsgType<P, S, R, E>>
14);
15
16impl<P, S, R, E> Client<P, S, R, E>
17where
18  P: 'static + Send,
19  R: 'static + Send,
20  E: 'static + Send
21{
22  /// Transmit a uni-directional message to the server end-point.
23  ///
24  /// # Errors
25  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
26  /// end-point has been dropped.
27  pub fn post(&self, msg: P) -> Result<(), Error<E>> {
28    self
29      .0
30      .push(InnerMsgType::Post(msg))
31      .map_err(|_| Error::ServerDisappeared)?;
32
33    Ok(())
34  }
35
36  /// Send a message to the server, wait for a reply, and return the reply.
37  ///
38  /// A complete round-trip (the message is delivered to the server, and the
39  /// server sends a reply) must complete before this function returns
40  /// success.
41  ///
42  /// This method is _currently_ reentrant: It is safe to use share a single
43  /// `Client` among multiple threads.  _This may change in the future_; it's
44  /// recommended to not rely on this.  The recommended way to send messages to
45  /// a server from multiple threads is to clone the `Client` and assign one
46  /// separate `Client` to each thread.
47  ///
48  /// # Return
49  /// On success the function will return `Ok(msg)`.
50  ///
51  /// # Errors
52  /// If the linked server object has been released, or is released while the
53  /// message is in the server's queue, [`Error::ServerDisappeared`] will be
54  /// returned.
55  ///
56  /// If the server never replied to the message and the
57  /// [`ReplyContext`](super::ReplyContext) was dropped [`Error::NoReply`] will
58  /// be returned.
59  ///
60  /// If an application specific error occurs it will be returned as a
61  /// `Err(Error::App(E))`, where `E` is the error type used when creating the
62  /// [`channel`](crate::channel).
63  pub fn req(&self, out: S) -> Result<R, Error<E>> {
64    // Create a per-call reply context.
65    // This context could be created when the Client object is being created
66    // and stored in the context, and thus be reused for reach client call.
67    // One side-effect is that some of the state semantics becomes more
68    // complicated.
69    // The central repo has such an implementation checked in, but it seems to
70    // have some more corner cases that aren't properly handled.
71    let (sctx, wctx) = swctx::mkpair();
72
73    self
74      .0
75      .push(InnerMsgType::Request(out, sctx))
76      .map_err(|_| Error::ServerDisappeared)?;
77
78    Ok(wctx.wait()?)
79  }
80
81  /// Issue a request, immediately returning a context that is used to wait for
82  /// the server's reply.
83  ///
84  /// The `_async` naming is slightly misleading -- this method isn't an
85  /// `async` in a language/`Future` sense, but rather it doesn't block and
86  /// wait for a reply before returning.  Instead it returns a [`WaitReply`]
87  /// object that is used to wait for the reply.
88  ///
89  /// This can be useful (in place of [`req()`](Client::req) or
90  /// [`areq()`](Client::areq()) methods) if the caller knows that the server
91  /// will take some time to respond to the request and the caller has other
92  /// tasks it can perform in the meantime.
93  ///
94  /// ```
95  /// use std::thread;
96  ///
97  /// use ump_ng::{channel, MsgType};
98  ///
99  /// let (server, client) = channel::<String, String, String, ()>();
100  ///
101  /// let server_thread = thread::spawn(move || {
102  ///   // Wait for data to arrive from a client
103  ///   println!("Server waiting for message ..");
104  ///   let MsgType::Request(data, mut rctx) = server.wait().unwrap() else {
105  ///     panic!("Unexpected message type");
106  ///   };
107  ///
108  ///   println!("Server received: '{}'", data);
109  ///
110  ///   // Long processing of data from client
111  ///
112  ///   // Reply to client
113  ///   let reply = format!("Hello, {}!", data);
114  ///   println!("Server replying '{}'", reply);
115  ///   rctx.reply(reply);
116  ///
117  ///   println!("Server done");
118  /// });
119  ///
120  /// let msg = String::from("Client");
121  /// println!("Client sending '{}'", msg);
122  /// let wrctx = client.req_async(msg).unwrap();
123  ///
124  /// // .. perform some operation while server is processing the request ..
125  ///
126  /// let reply = wrctx.wait().unwrap();
127  /// println!("Client received reply '{}'", reply);
128  /// println!("Client done");
129  ///
130  /// server_thread.join().unwrap();
131  /// ```
132  ///
133  /// # Errors
134  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
135  /// end-point has been dropped.
136  pub fn req_async(&self, out: S) -> Result<WaitReply<R, E>, Error<E>> {
137    let (sctx, wctx) = swctx::mkpair();
138    self
139      .0
140      .push(InnerMsgType::Request(out, sctx))
141      .map_err(|_| Error::ServerDisappeared)?;
142
143    Ok(WaitReply(wctx))
144  }
145
146  /// Same as [`Client::req()`], but for use in `async` contexts.
147  #[allow(clippy::missing_errors_doc)]
148  pub async fn areq(&self, out: S) -> Result<R, Error<E>>
149  where
150    S: Send
151  {
152    let (sctx, wctx) = swctx::mkpair();
153
154    self
155      .0
156      .push(InnerMsgType::Request(out, sctx))
157      .map_err(|_| Error::ServerDisappeared)?;
158
159    Ok(wctx.wait_async().await?)
160  }
161
162  /// Create a weak `Client` reference.
163  #[must_use]
164  pub fn weak(&self) -> Weak<P, S, R, E> {
165    Weak(self.0.weak())
166  }
167
168  /// Create a special-purpose client that can only perform post operations.
169  #[must_use]
170  pub fn postclient(&self) -> Post<P, S, R, E> {
171    Post(self.clone())
172  }
173}
174
175impl<P, S, R, E> Clone for Client<P, S, R, E> {
176  /// Clone a client.
177  ///
178  /// When a client is cloned the new object will be linked to the same server,
179  /// but in all other respects the clone is a completely independent client.
180  ///
181  /// This means that a cloned client can be passed to a new thread/task and
182  /// make new independent calls to the server without any risk of collision
183  /// between clone and the original client object.
184  fn clone(&self) -> Self {
185    Self(self.0.clone())
186  }
187}
188
189
190/// Context used to wait for a server to reply to a request.
191pub struct WaitReply<R, E>(swctx::WaitCtx<R, RCtxState, E>);
192
193impl<R, E> WaitReply<R, E> {
194  /// Block and wait for a reply.
195  ///
196  /// For use in non-`async` threads.
197  ///
198  /// # Errors
199  /// [`Error::ServerDisappeared`] means the linked server object has been
200  /// released.
201  ///
202  /// If the [`ReplyContext`](super::ReplyContext) is dropped by the server
203  /// handler it replies to the message, [`Error::NoReply`] will be returned.
204  ///
205  /// If an application specific error occurs it will be returned in
206  /// [`Error::App`].
207  pub fn wait(self) -> Result<R, Error<E>> {
208    Ok(self.0.wait()?)
209  }
210
211  /// Block and wait for a reply.
212  ///
213  /// Same as [`WaitReply::wait()`], but for use in `async` contexts.
214  #[allow(clippy::missing_errors_doc)]
215  pub async fn wait_async(self) -> Result<R, Error<E>>
216  where
217    R: Send,
218    E: Send
219  {
220    Ok(self.0.wait_async().await?)
221  }
222}
223
224
225/// A weak client reference that can be upgraded to a [`Client`] as long as
226/// other `Client` objects till exist.
227#[repr(transparent)]
228pub struct Weak<P, S, R, E>(
229  pub(crate) sigq::WeakPusher<InnerMsgType<P, S, R, E>>
230);
231
232impl<P, S, R, E> Clone for Weak<P, S, R, E> {
233  fn clone(&self) -> Self {
234    Self(self.0.clone())
235  }
236}
237
238impl<P, S, R, E> Weak<P, S, R, E> {
239  #[must_use]
240  pub fn upgrade(&self) -> Option<Client<P, S, R, E>> {
241    self.0.upgrade().map(|x| Client(x))
242  }
243}
244
245
246/// Special purpose client end-point that can only issue `Post` requests.
247#[derive(Clone)]
248#[repr(transparent)]
249pub struct Post<P, S, R, E>(Client<P, S, R, E>);
250
251impl<P, S, R, E> Post<P, S, R, E>
252where
253  P: 'static + Send,
254  S: 'static + Send,
255  R: 'static + Send,
256  E: 'static + Send
257{
258  /// Transmit a uni-directional message to the server end-point.
259  ///
260  /// # Errors
261  /// [`Error::ServerDisappeared`] means the [`Server`](super::Server)
262  /// end-point has been dropped.
263  pub fn post(&self, msg: P) -> Result<(), Error<E>> {
264    self.0.post(msg)
265  }
266}
267
268// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :