1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
use crate::rctx::err::Error;
use crate::rctx::inner::State;
use crate::rctx::InnerReplyContext;
/// Public-facing sender part of the `ReplyContext` object.
///
/// This is safe to pass to applications which are meant to only be able to put
/// a value through the `ReplyContext` channel, but not extract the value from
/// it.
pub struct ReplyContext<I, E> {
inner: InnerReplyContext<I, E>,
did_handover: bool
}
impl<I: 'static + Send, E> ReplyContext<I, E> {
/// Send a reply back to originating client.
///
/// # Example
/// ```
/// use std::thread;
/// use ump::channel;
///
/// fn main() {
/// let (server, client) = channel::<String, String, ()>();
/// let server_thread = thread::spawn(move || {
/// let (data, rctx) = server.wait();
/// let reply = format!("Hello, {}!", data);
/// rctx.reply(reply).unwrap();
/// });
/// let msg = String::from("Client");
/// let reply = client.send(String::from(msg)).unwrap();
/// assert_eq!(reply, "Hello, Client!");
/// server_thread.join().unwrap();
/// }
/// ```
///
/// # Semantics
/// This call is safe to make after the server context has been released.
pub fn reply(mut self, data: I) -> Result<(), Error<E>> {
self.inner.put(data);
self.did_handover = true;
Ok(())
}
/// Return an error to originating client.
/// This will cause the calling client to return an error. The error passed
/// in the `err` parameter will be wrapped in a `Error::App(err)`.
///
/// # Example
///
/// ```
/// use std::thread;
/// use ump::{channel, Error};
///
/// #[derive(Debug, PartialEq)]
/// enum MyError {
/// SomeError(String)
/// }
///
/// fn main() {
/// let (server, client) = channel::<String, String, MyError>();
/// let server_thread = thread::spawn(move || {
/// let (_, rctx) = server.wait();
/// rctx.fail(MyError::SomeError("failed".to_string())).unwrap();
/// });
/// let msg = String::from("Client");
/// let reply = client.send(String::from(msg));
/// match reply {
/// Err(Error::App(MyError::SomeError(s))) => {
/// assert_eq!(s, "failed");
/// }
/// _ => {
/// panic!("Unexpected return value");
/// }
/// }
/// server_thread.join().unwrap();
/// }
/// ```
///
/// # Semantics
/// This call is safe to make after the server context has been released.
pub fn fail(mut self, err: E) -> Result<(), Error<E>> {
self.inner.fail(err);
self.did_handover = true;
Ok(())
}
}
impl<I, E> Drop for ReplyContext<I, E> {
/// If the reply context is dropped while still waiting for a reply then
/// report back to the caller that it should expect no reply.
fn drop(&mut self) {
if self.did_handover == false {
let mut do_signal: bool = false;
let mut mg = self.inner.data.lock();
match *mg {
State::Waiting => {
*mg = State::NoReply;
do_signal = true;
}
_ => {}
}
drop(mg);
if do_signal {
let mut g = self.inner.taskwaker.lock();
if let Some(waker) = std::mem::replace(&mut *g, None) {
waker.wake();
}
self.inner.signal.notify_one();
}
}
}
}
impl<I, E> From<InnerReplyContext<I, E>> for ReplyContext<I, E> {
/// Transform an internal reply context into a public one and change the
/// state from Queued to Waiting to signal that the node has left the
/// queue.
fn from(inner: InnerReplyContext<I, E>) -> Self {
// Switch state from "Queued" to "Waiting", to mark that the reply context
// has been "picked up".
let mut mg = inner.data.lock();
match *mg {
State::Queued => {
*mg = State::Waiting;
drop(mg);
}
_ => {
// Should never happen
drop(mg);
panic!("Unexpected node state.");
}
}
ReplyContext {
inner: inner.clone(),
did_handover: false
}
}
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :