#![deny(unsafe_code)]
use core::{
fmt::{self, Debug, Formatter},
future::Future,
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, Waker},
};
use std::{
error::Error,
mem,
sync::{Condvar, Mutex},
};
use std::{sync::Arc, thread};
#[cfg(feature = "compact")]
mod compact;
#[cfg(feature = "compact")]
pub use compact::*;
use error::Cause;
pub mod error;
enum TypeOpt<S, R>
where
S: Send,
R: Send,
{
Channel(S),
Success(R),
Error(Cause),
None,
}
impl<S, R> Default for TypeOpt<S, R>
where
S: Send,
R: Send,
{
fn default() -> Self {
Self::None
}
}
impl<S, R> Debug for TypeOpt<S, R>
where
S: Send + Debug,
R: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Channel(s) => f.debug_tuple("Channel").field(s).finish(),
Self::Success(r) => f.debug_tuple("Success").field(r).finish(),
Self::Error(e) => f.debug_tuple("Error").field(e).finish(),
Self::None => write!(f, "None"),
}
}
}
impl<S, R> TypeOpt<S, R>
where
S: Send,
R: Send,
{
fn take(&mut self) -> Self {
mem::take(self)
}
}
struct InnerState<S, R>
where
S: Send,
R: Send,
{
activated: AtomicBool,
result_ready: AtomicBool,
channel_present: AtomicBool,
mtx: Mutex<TypeOpt<S, R>>,
cvar: Condvar,
canceled: AtomicBool,
}
impl<S, R> Debug for InnerState<S, R>
where
S: Send + Debug,
R: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("InnerState")
.field("result_ready", &self.result_ready)
.field("channel_present", &self.channel_present)
.field("mtx", &self.mtx)
.field("cvar", &self.cvar)
.field("canceled", &self.canceled)
.field("activated", &self.activated)
.finish()
}
}
impl<S, R> Drop for InnerState<S, R>
where
S: Send,
R: Send,
{
fn drop(&mut self) {}
}
pub struct FlowerState<S, R>
where
S: Send,
R: Send,
{
state: Arc<InnerState<S, R>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R> Debug for FlowerState<S, R>
where
S: Send + Debug,
R: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("FlowerState")
.field("state", &self.state)
.field("async_suspender", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
impl<S, R> FlowerState<S, R>
where
S: Send,
R: Send,
{
pub fn id(&self) -> usize {
self.id
}
pub fn cancel(&self) {
self.state.canceled.store(true, Ordering::Relaxed);
}
pub fn is_canceled(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
}
impl<S, R> Clone for FlowerState<S, R>
where
S: Send,
R: Send,
{
fn clone(&self) -> Self {
Self {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
}
impl<S, R> Drop for FlowerState<S, R>
where
S: Send,
R: Send,
{
fn drop(&mut self) {}
}
struct AsyncSuspender {
inner: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
}
impl Future for AsyncSuspender {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut mtx = self.inner.0.lock().unwrap();
if !self.inner.1.load(Ordering::Relaxed) {
Poll::Ready(())
} else {
*mtx = Some(cx.waker().clone());
Poll::Pending
}
}
}
pub struct Handle<S, R>
where
S: Send,
R: Send,
{
state: Arc<InnerState<S, R>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R> Handle<S, R>
where
S: Send,
R: Send,
{
pub fn id(&self) -> usize {
self.id
}
pub fn activate(&self) {
self.state.activated.store(true, Ordering::Relaxed);
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
pub fn should_cancel(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn send(&self, s: S) {
let mut mtx = self.state.mtx.lock().unwrap();
{
*mtx = TypeOpt::Channel(s);
self.state.channel_present.store(true, Ordering::Relaxed);
self.async_suspender.1.store(false, Ordering::Relaxed);
}
drop(self.state.cvar.wait(mtx));
}
pub async fn send_async(&self, s: S) {
{
*self.state.mtx.lock().unwrap() = TypeOpt::Channel(s);
self.async_suspender.1.store(true, Ordering::Relaxed);
self.state.channel_present.store(true, Ordering::Relaxed);
}
AsyncSuspender {
inner: self.async_suspender.clone(),
}
.await
}
pub fn set_result(&self, r: Result<R, Box<dyn Error>>) {
match r {
Ok(val) => self.success(val),
Err(e) => self.error_verbose(e),
}
}
pub fn set_result_no_verbose(&self, r: Result<R, Box<dyn Error>>) {
match r {
Ok(val) => self.success(val),
Err(e) => self.error(e),
}
}
pub fn success(&self, r: R) {
*self.state.mtx.lock().unwrap() = TypeOpt::Success(r);
self.state.result_ready.store(true, Ordering::Relaxed);
}
pub fn error(&self, e: impl ToString) {
*self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(e.to_string()));
self.state.result_ready.store(true, Ordering::Relaxed);
}
pub fn error_verbose(&self, e: Box<dyn Error>) {
let err_kind = format!("{:?}", e);
*self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Suppose(err_kind));
self.state.result_ready.store(true, Ordering::Relaxed);
}
}
impl<S, R> Drop for Handle<S, R>
where
S: Send,
R: Send,
{
fn drop(&mut self) {
if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) {
self.state.channel_present.store(false, Ordering::Relaxed);
let err = format!("the flower handle with id: {} error panicked!", self.id);
*self.state.mtx.lock().unwrap() = TypeOpt::Error(Cause::Panicked(err));
self.state.result_ready.store(true, Ordering::Relaxed);
}
}
}
impl<S, R> Debug for Handle<S, R>
where
S: Send + Debug,
R: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Handle")
.field("state", &self.state)
.field("awaiting", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
pub enum Finalizer<'a, S: Send, R: Send> {
Try(&'a Flower<S, R>),
}
impl<S, R> Finalizer<'_, S, R>
where
S: Send,
R: Send,
{
pub fn finalize(self, f: impl FnOnce(Result<R, Cause>)) {
let Self::Try(flower) = self;
if flower.state.result_ready.load(Ordering::Relaxed) {
let result = move || {
let result = flower.state.mtx.lock().unwrap().take();
flower.state.result_ready.store(false, Ordering::Relaxed);
flower.state.activated.store(false, Ordering::Relaxed);
result
};
let result = result();
if let TypeOpt::Success(value) = result {
f(Ok(value))
} else if let TypeOpt::Error(err_type) = result {
f(Err(err_type))
}
}
}
}
pub struct Flower<S, R>
where
S: Send,
R: Send,
{
state: Arc<InnerState<S, R>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R> Flower<S, R>
where
S: Send,
R: Send,
{
pub fn new(id: usize) -> Self {
Self {
state: Arc::new(InnerState {
activated: AtomicBool::new(false),
result_ready: AtomicBool::new(false),
channel_present: AtomicBool::new(false),
mtx: Mutex::new(TypeOpt::None),
cvar: Condvar::new(),
canceled: AtomicBool::new(false),
}),
async_suspender: Arc::new((Mutex::new(None), AtomicBool::new(false))),
id,
}
}
pub fn id(&self) -> usize {
self.id
}
pub fn handle(&self) -> Handle<S, R> {
self.state.canceled.store(false, Ordering::Relaxed);
Handle {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
pub fn state(&self) -> FlowerState<S, R> {
self.state.canceled.store(false, Ordering::Relaxed);
FlowerState {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
pub fn cancel(&self) {
self.state.canceled.store(true, Ordering::Relaxed);
}
pub fn is_canceled(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
pub fn result_is_ready(&self) -> bool {
self.state.result_ready.load(Ordering::Relaxed)
}
pub fn channel_is_present(&self) -> bool {
self.state.channel_present.load(Ordering::Relaxed)
}
pub fn try_result(&self, f: impl FnOnce(Result<R, Cause>)) {
if self.state.channel_present.load(Ordering::Relaxed) {
self.state.cvar.notify_all();
self.state.channel_present.store(false, Ordering::Relaxed)
}
if self.state.result_ready.load(Ordering::Relaxed) {
let result = move || {
let result = self.state.mtx.lock().unwrap().take();
self.state.result_ready.store(false, Ordering::Relaxed);
self.state.activated.store(false, Ordering::Relaxed);
result
};
let result = result();
if let TypeOpt::Success(value) = result {
f(Ok(value))
} else if let TypeOpt::Error(err_type) = result {
f(Err(err_type))
}
}
}
pub fn extract(&self, f: impl FnOnce(S)) -> Finalizer<'_, S, R> {
if self.state.channel_present.load(Ordering::Relaxed) {
let channel = move || {
let channel = self.state.mtx.lock().unwrap().take();
self.state.channel_present.store(false, Ordering::Relaxed);
if self.async_suspender.1.load(Ordering::Relaxed) {
let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
self.async_suspender.1.store(false, Ordering::Relaxed);
if let Some(waker) = mg_opt_waker.take() {
waker.wake();
}
} else {
self.state.cvar.notify_all();
}
channel
};
if let TypeOpt::Channel(value) = channel() {
f(value)
}
}
Finalizer::Try(self)
}
pub fn poll(&self, f: impl FnOnce(Option<S>)) -> Finalizer<'_, S, R> {
if self.state.channel_present.load(Ordering::Relaxed) {
let channel = move || {
let channel = self.state.mtx.lock().unwrap().take();
self.state.channel_present.store(false, Ordering::Relaxed);
if self.async_suspender.1.load(Ordering::Relaxed) {
let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
self.async_suspender.1.store(false, Ordering::Relaxed);
if let Some(waker) = mg_opt_waker.take() {
waker.wake();
}
} else {
self.state.cvar.notify_all();
}
if let TypeOpt::Channel(value) = channel {
Some(value)
} else {
None
}
};
let channel = channel();
f(channel)
} else {
f(None)
}
Finalizer::Try(self)
}
}
impl<S, R> Debug for Flower<S, R>
where
S: Send + Debug,
R: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Flower")
.field("state", &self.state)
.field("async_suspender", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
impl<S, R> Drop for Flower<S, R>
where
S: Send,
R: Send,
{
fn drop(&mut self) {}
}
pub trait IntoResult<T> {
fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>>;
}
impl<T> IntoResult<T> for Option<T> {
fn catch(self, error_msg: impl ToString) -> Result<T, Box<dyn Error>> {
let message: String = error_msg.to_string();
match self {
Some(val) => Ok(val),
None => Err(message.into()),
}
}
}