ump_ng/
rctx.rs

1//! Allow a thread/task, crossing sync/async boundaries in either direction, to
2//! deliver an expected piece of data to another thread/task, with
3//! notification.
4//!
5//! These are simple channels used to deliver data from one endpoint to
6//! another, where the receiver will block until data has been delivered.
7
8use crate::err::Error;
9
10#[derive(Clone, Default)]
11pub enum RCtxState {
12  #[default]
13  Queued,
14  Active
15}
16
17/// Object used to reply to requests passed to the server.
18pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>);
19
20impl<T, E> ReplyContext<T, E> {
21  /// Send a reply back to originating client.
22  ///
23  /// # Example
24  /// ```
25  /// use std::thread;
26  /// use ump_ng::{channel, MsgType};
27  ///
28  /// let (server, client) = channel::<(), String, String, ()>();
29  /// let server_thread = thread::spawn(move || {
30  ///   let MsgType::Request(data, rctx) = server.wait().unwrap() else {
31  ///     panic!("Not Request type");
32  ///   };
33  ///   let reply = format!("Hello, {}!", data);
34  ///   rctx.reply(reply).unwrap();
35  /// });
36  /// let msg = String::from("Client");
37  /// let reply = client.req(msg).unwrap();
38  /// assert_eq!(reply, "Hello, Client!");
39  /// server_thread.join().unwrap();
40  /// ```
41  ///
42  /// # Errors
43  /// If the originating caller is no longer waiting for a reply (i.e. was
44  /// dropped) [`Error::OriginDisappeared`] is returned.
45  ///
46  /// # Semantics
47  /// This call is safe to make after the server context has been released.
48  pub fn reply(self, data: T) -> Result<(), Error<E>> {
49    self.0.set(data)?;
50    Ok(())
51  }
52
53  /// Return an error to originating client.
54  /// This will cause the calling client to return an error.  The error passed
55  /// in the `err` parameter will be wrapped in a `Error::App(err)`.
56  ///
57  /// # Example
58  ///
59  /// ```
60  /// use std::thread;
61  /// use ump_ng::{channel, Error, MsgType};
62  ///
63  /// #[derive(Debug, PartialEq)]
64  /// enum MyError {
65  ///   SomeError(String)
66  /// }
67  ///
68  /// let (server, client) = channel::<String, String, String, MyError>();
69  /// let server_thread = thread::spawn(move || {
70  ///   let MsgType::Request(_, rctx) = server.wait().unwrap() else {
71  ///     panic!("Unexpected message type");
72  ///   };
73  ///   rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
74  /// });
75  /// let msg = String::from("Client");
76  /// let reply = client.req(msg);
77  /// match reply {
78  ///   Err(Error::App(MyError::SomeError(s))) => {
79  ///     assert_eq!(s, "failed");
80  ///   }
81  ///   _ => {
82  ///     panic!("Unexpected return value");
83  ///   }
84  /// }
85  /// server_thread.join().unwrap();
86  /// ```
87  ///
88  /// # Errors
89  /// [`Error::OriginDisappeared`] indicates that the originating caller is no
90  /// longer waiting for a reply (i.e. it was dropped).
91  ///
92  /// # Semantics
93  /// This call is safe to make after the server context has been released.
94  pub fn fail(self, err: E) -> Result<(), Error<E>> {
95    self.0.fail(err)?;
96    Ok(())
97  }
98}
99
100impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
101  type Error = Error<E>;
102
103  /// Convert a `SetCtx` into a `ReplyContext`.
104  ///
105  /// Sets the `SetCtx`'s stat to _Active_.
106  fn try_from(
107    sctx: swctx::SetCtx<T, RCtxState, E>
108  ) -> Result<Self, Self::Error> {
109    let _ = sctx.set_state(RCtxState::Active);
110    Ok(Self(sctx))
111  }
112}
113
114// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :