ump_ng/rctx.rs
1//! Allow a thread/task, crossing sync/async boundaries in either direction, to
2//! deliver an expected piece of data to another thread/task, with
3//! notification.
4//!
5//! These are simple channels used to deliver data from one endpoint to
6//! another, where the receiver will block until data has been delivered.
7
8use crate::err::Error;
9
10#[derive(Clone, Default)]
11pub enum RCtxState {
12 #[default]
13 Queued,
14 Active
15}
16
17/// Object used to reply to requests passed to the server.
18pub struct ReplyContext<T, E>(swctx::SetCtx<T, RCtxState, E>);
19
20impl<T, E> ReplyContext<T, E> {
21 /// Send a reply back to originating client.
22 ///
23 /// # Example
24 /// ```
25 /// use std::thread;
26 /// use ump_ng::{channel, MsgType};
27 ///
28 /// let (server, client) = channel::<(), String, String, ()>();
29 /// let server_thread = thread::spawn(move || {
30 /// let MsgType::Request(data, rctx) = server.wait().unwrap() else {
31 /// panic!("Not Request type");
32 /// };
33 /// let reply = format!("Hello, {}!", data);
34 /// rctx.reply(reply).unwrap();
35 /// });
36 /// let msg = String::from("Client");
37 /// let reply = client.req(msg).unwrap();
38 /// assert_eq!(reply, "Hello, Client!");
39 /// server_thread.join().unwrap();
40 /// ```
41 ///
42 /// # Errors
43 /// If the originating caller is no longer waiting for a reply (i.e. was
44 /// dropped) [`Error::OriginDisappeared`] is returned.
45 ///
46 /// # Semantics
47 /// This call is safe to make after the server context has been released.
48 pub fn reply(self, data: T) -> Result<(), Error<E>> {
49 self.0.set(data)?;
50 Ok(())
51 }
52
53 /// Return an error to originating client.
54 /// This will cause the calling client to return an error. The error passed
55 /// in the `err` parameter will be wrapped in a `Error::App(err)`.
56 ///
57 /// # Example
58 ///
59 /// ```
60 /// use std::thread;
61 /// use ump_ng::{channel, Error, MsgType};
62 ///
63 /// #[derive(Debug, PartialEq)]
64 /// enum MyError {
65 /// SomeError(String)
66 /// }
67 ///
68 /// let (server, client) = channel::<String, String, String, MyError>();
69 /// let server_thread = thread::spawn(move || {
70 /// let MsgType::Request(_, rctx) = server.wait().unwrap() else {
71 /// panic!("Unexpected message type");
72 /// };
73 /// rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
74 /// });
75 /// let msg = String::from("Client");
76 /// let reply = client.req(msg);
77 /// match reply {
78 /// Err(Error::App(MyError::SomeError(s))) => {
79 /// assert_eq!(s, "failed");
80 /// }
81 /// _ => {
82 /// panic!("Unexpected return value");
83 /// }
84 /// }
85 /// server_thread.join().unwrap();
86 /// ```
87 ///
88 /// # Errors
89 /// [`Error::OriginDisappeared`] indicates that the originating caller is no
90 /// longer waiting for a reply (i.e. it was dropped).
91 ///
92 /// # Semantics
93 /// This call is safe to make after the server context has been released.
94 pub fn fail(self, err: E) -> Result<(), Error<E>> {
95 self.0.fail(err)?;
96 Ok(())
97 }
98}
99
100impl<T, E> TryFrom<swctx::SetCtx<T, RCtxState, E>> for ReplyContext<T, E> {
101 type Error = Error<E>;
102
103 /// Convert a `SetCtx` into a `ReplyContext`.
104 ///
105 /// Sets the `SetCtx`'s stat to _Active_.
106 fn try_from(
107 sctx: swctx::SetCtx<T, RCtxState, E>
108 ) -> Result<Self, Self::Error> {
109 let _ = sctx.set_state(RCtxState::Active);
110 Ok(Self(sctx))
111 }
112}
113
114// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :