1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
//! _swctx_ is very similar to a one-shot channel, but with some added
//! semantics.
//!
//! ```
//! use std::thread;
//! use swctx::mkpair;
//!
//! let (sctx, wctx) = mkpair::<&str, &str, &str>();
//! let jh = thread::spawn(move || {
//! sctx.set_state("in thread");
//! sctx.set("hello");
//! });
//! jh.join().unwrap();
//!
//! assert_eq!(wctx.wait().unwrap(), "hello");
//! ```
//!
//! In a typical use-case an application or library calls [`mkpair()`] to
//! create a pair of linked [`SetCtx`] and [`WaitCtx`] object. The `SetCtx`
//! object is transferred to a remote thread/task, and the `WaitCtx` is used
//! wait for an object to arrive [from the thread/task the `SetCtx` is sent
//! to].
//!
//! Once the thread/task has data to send back to the `WaitCtx` it calls
//! [`SetCtx::set()`] to send the data.
//!
//! The `SetCtx` has an internal state, settable using [`SetCtx::set_state()`]
//! that will be reported back to the `WaitCtx`, which will return
//! [`Error::Aborted`], if the `SetCtx` is dropped prematurely.
//!
//! The `SetCtx` can also signal a failure by calling [`SetCtx::fail()`] and
//! pass along an application-specific error code. This will cause the
//! `WaitCtx` to unblock and return [`Error::App`].
mod err;
mod sctx;
pub(crate) mod wctx;
use std::{sync::Arc, task::Waker};
use parking_lot::{Condvar, Mutex};
pub use sctx::SetCtx;
pub use wctx::{WaitCtx, WaitFuture};
pub use err::Error;
enum State<T, S, E> {
/// Waiting for a delivery.
Waiting,
/// Data was delivered.
Data(T),
/// Reply is being returned to caller.
Finalized,
/// An error occurred.
Err(Error<S, E>)
}
struct Inner<T, S, E> {
state: State<T, S, E>,
sctx_state: S,
waker: Option<Waker>
}
impl<T, S, E> Inner<T, S, E> {
fn try_get(&mut self) -> Result<Option<T>, Error<S, E>> {
match self.state {
State::Waiting => Ok(None),
State::Data(_) => {
let old = std::mem::replace(&mut self.state, State::Finalized);
let State::Data(data) = old else {
panic!("Unable to extract data");
};
Ok(Some(data))
}
State::Err(_) => {
let old = std::mem::replace(&mut self.state, State::Finalized);
let State::Err(err) = old else {
panic!("Unable to extract error");
};
Err(err)
}
_ => {
panic!("Unexpected state");
}
}
}
}
struct Shared<T, S, E> {
inner: Mutex<Inner<T, S, E>>,
signal: Condvar
}
impl<T, S, E> Shared<T, S, E> {
fn notify_waiter(&self, inner: &mut Inner<T, S, E>) {
self.signal.notify_one();
if let Some(waker) = inner.waker.take() {
waker.wake()
}
}
}
/// Create a linked [`SetCtx`] and [`WaitCtx`] pair.
///
/// The `WaitCtx` is used to wait for a value to arrive from the `SetCtx`.
pub fn mkpair<T, S, E>() -> (SetCtx<T, S, E>, WaitCtx<T, S, E>)
where
S: Clone + Default
{
let inner = Inner {
state: State::Waiting,
sctx_state: S::default(),
waker: None
};
let sh = Shared {
inner: Mutex::new(inner),
signal: Condvar::new()
};
let sh = Arc::new(sh);
let sctx = SetCtx(Arc::clone(&sh));
let wctx = WaitCtx(sh);
(sctx, wctx)
}
// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :