use crate::{error::Compact, AsyncSuspender};
use core::{
fmt::{self, Debug, Formatter},
sync::atomic::{AtomicBool, Ordering},
task::Waker,
};
use std::{
mem,
sync::{Condvar, Mutex},
};
use std::{sync::Arc, thread};
enum CompactTypeOpt<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
Channel(S),
Success(R),
Error(Compact<E>),
None,
}
impl<S, R, E> Default for CompactTypeOpt<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn default() -> Self {
Self::None
}
}
impl<S, R, E> Debug for CompactTypeOpt<S, R, E>
where
S: Send + Debug,
R: Send + Debug,
E: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::Channel(s) => f.debug_tuple("Channel").field(s).finish(),
Self::Success(r) => f.debug_tuple("Success").field(r).finish(),
Self::Error(e) => f.debug_tuple("Error").field(e).finish(),
Self::None => write!(f, "None"),
}
}
}
impl<S, R, E> CompactTypeOpt<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn take(&mut self) -> Self {
mem::take(self)
}
}
struct InnerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
activated: AtomicBool,
result_ready: AtomicBool,
channel_present: AtomicBool,
mtx: Mutex<CompactTypeOpt<S, R, E>>,
cvar: Condvar,
canceled: AtomicBool,
}
impl<S, R, E> Debug for InnerState<S, R, E>
where
S: Send + Debug,
R: Send + Debug,
E: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("InnerState")
.field("result_ready", &self.result_ready)
.field("channel_present", &self.channel_present)
.field("mtx", &self.mtx)
.field("cvar", &self.cvar)
.field("canceled", &self.canceled)
.field("activated", &self.activated)
.finish()
}
}
impl<S, R, E> Drop for InnerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn drop(&mut self) {}
}
pub struct CompactFlowerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
state: Arc<InnerState<S, R, E>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R, E> Debug for CompactFlowerState<S, R, E>
where
S: Send + Debug,
R: Send + Debug,
E: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactFlowerState")
.field("state", &self.state)
.field("async_suspender", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
impl<S, R, E> CompactFlowerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
pub fn id(&self) -> usize {
self.id
}
pub fn cancel(&self) {
self.state.canceled.store(true, Ordering::Relaxed);
}
pub fn is_canceled(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
}
impl<S, R, E> Clone for CompactFlowerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn clone(&self) -> Self {
Self {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
}
impl<S, R, E> Drop for CompactFlowerState<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn drop(&mut self) {}
}
pub struct CompactHandle<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
state: Arc<InnerState<S, R, E>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R, E> CompactHandle<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
pub fn id(&self) -> usize {
self.id
}
pub fn activate(&self) {
self.state.activated.store(true, Ordering::Relaxed);
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
pub fn should_cancel(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn send(&self, s: S) {
let mut mtx = self.state.mtx.lock().unwrap();
{
*mtx = CompactTypeOpt::Channel(s);
self.state.channel_present.store(true, Ordering::Relaxed);
self.async_suspender.1.store(false, Ordering::Relaxed);
}
drop(self.state.cvar.wait(mtx));
}
pub async fn send_async(&self, s: S) {
{
*self.state.mtx.lock().unwrap() = CompactTypeOpt::Channel(s);
self.async_suspender.1.store(true, Ordering::Relaxed);
self.state.channel_present.store(true, Ordering::Relaxed);
}
AsyncSuspender {
inner: self.async_suspender.clone(),
}
.await
}
pub fn success(&self, r: R) {
*self.state.mtx.lock().unwrap() = CompactTypeOpt::Success(r);
self.state.result_ready.store(true, Ordering::Relaxed);
}
pub fn error(&self, e: E) {
*self.state.mtx.lock().unwrap() = CompactTypeOpt::Error(Compact::Suppose(e));
self.state.result_ready.store(true, Ordering::Relaxed);
}
}
impl<S, R, E> Drop for CompactHandle<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn drop(&mut self) {
if thread::panicking() && !self.state.result_ready.load(Ordering::Relaxed) {
self.state.channel_present.store(false, Ordering::Relaxed);
let err = format!(
"the compact flower handle with id: {} error panicked!",
self.id
);
*self.state.mtx.lock().unwrap() = CompactTypeOpt::Error(Compact::Panicked(err));
self.state.result_ready.store(true, Ordering::Relaxed);
}
}
}
impl<S, R, E> Debug for CompactHandle<S, R, E>
where
S: Send + Debug,
R: Send + Debug,
E: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactHandle")
.field("state", &self.state)
.field("awaiting", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
pub enum CompactFinalizer<'a, S: Send, R: Send, E: Send> {
Try(&'a CompactFlower<S, R, E>),
}
impl<S, R, E> CompactFinalizer<'_, S, R, E>
where
S: Send,
R: Send,
E: Send,
{
pub fn finalize(self, f: impl FnOnce(Result<R, Compact<E>>)) {
let Self::Try(compact_flower) = self;
if compact_flower.state.result_ready.load(Ordering::Relaxed) {
let result = move || {
let result = compact_flower.state.mtx.lock().unwrap().take();
compact_flower
.state
.result_ready
.store(false, Ordering::Relaxed);
compact_flower
.state
.activated
.store(false, Ordering::Relaxed);
result
};
let result = result();
if let CompactTypeOpt::Success(value) = result {
f(Ok(value))
} else if let CompactTypeOpt::Error(err_type) = result {
f(Err(err_type))
}
}
}
}
pub struct CompactFlower<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
state: Arc<InnerState<S, R, E>>,
async_suspender: Arc<(Mutex<Option<Waker>>, AtomicBool)>,
id: usize,
}
impl<S, R, E> CompactFlower<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
pub fn new(id: usize) -> Self {
Self {
state: Arc::new(InnerState {
activated: AtomicBool::new(false),
result_ready: AtomicBool::new(false),
channel_present: AtomicBool::new(false),
mtx: Mutex::new(CompactTypeOpt::None),
cvar: Condvar::new(),
canceled: AtomicBool::new(false),
}),
async_suspender: Arc::new((Mutex::new(None), AtomicBool::new(false))),
id,
}
}
pub fn id(&self) -> usize {
self.id
}
pub fn handle(&self) -> CompactHandle<S, R, E> {
self.state.canceled.store(false, Ordering::Relaxed);
CompactHandle {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
pub fn state(&self) -> CompactFlowerState<S, R, E> {
self.state.canceled.store(false, Ordering::Relaxed);
CompactFlowerState {
state: Clone::clone(&self.state),
async_suspender: Clone::clone(&self.async_suspender),
id: self.id,
}
}
pub fn cancel(&self) {
self.state.canceled.store(true, Ordering::Relaxed);
}
pub fn is_canceled(&self) -> bool {
self.state.canceled.load(Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.state.activated.load(Ordering::Relaxed)
}
pub fn result_is_ready(&self) -> bool {
self.state.result_ready.load(Ordering::Relaxed)
}
pub fn channel_is_present(&self) -> bool {
self.state.channel_present.load(Ordering::Relaxed)
}
pub fn try_result(&self, f: impl FnOnce(Result<R, Compact<E>>)) {
if self.state.channel_present.load(Ordering::Relaxed) {
self.state.cvar.notify_all();
self.state.channel_present.store(false, Ordering::Relaxed)
}
if self.state.result_ready.load(Ordering::Relaxed) {
let result = move || {
let result = self.state.mtx.lock().unwrap().take();
self.state.result_ready.store(false, Ordering::Relaxed);
self.state.activated.store(false, Ordering::Relaxed);
result
};
let result = result();
if let CompactTypeOpt::Success(value) = result {
f(Ok(value))
} else if let CompactTypeOpt::Error(err_type) = result {
f(Err(err_type))
}
}
}
pub fn extract(&self, f: impl FnOnce(S)) -> CompactFinalizer<'_, S, R, E> {
if self.state.channel_present.load(Ordering::Relaxed) {
let channel = move || {
let channel = self.state.mtx.lock().unwrap().take();
self.state.channel_present.store(false, Ordering::Relaxed);
if self.async_suspender.1.load(Ordering::Relaxed) {
let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
self.async_suspender.1.store(false, Ordering::Relaxed);
if let Some(waker) = mg_opt_waker.take() {
waker.wake();
}
} else {
self.state.cvar.notify_all();
}
channel
};
if let CompactTypeOpt::Channel(value) = channel() {
f(value)
}
}
CompactFinalizer::Try(self)
}
pub fn poll(&self, f: impl FnOnce(Option<S>)) -> CompactFinalizer<'_, S, R, E> {
if self.state.channel_present.load(Ordering::Relaxed) {
let channel = move || {
let channel = self.state.mtx.lock().unwrap().take();
self.state.channel_present.store(false, Ordering::Relaxed);
if self.async_suspender.1.load(Ordering::Relaxed) {
let mut mg_opt_waker = self.async_suspender.0.lock().unwrap();
self.async_suspender.1.store(false, Ordering::Relaxed);
if let Some(waker) = mg_opt_waker.take() {
waker.wake();
}
} else {
self.state.cvar.notify_all();
}
if let CompactTypeOpt::Channel(value) = channel {
Some(value)
} else {
None
}
};
let channel = channel();
f(channel)
} else {
f(None)
}
CompactFinalizer::Try(self)
}
}
impl<S, R, E> Debug for CompactFlower<S, R, E>
where
S: Send + Debug,
R: Send + Debug,
E: Send + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CompactFlower")
.field("state", &self.state)
.field("async_suspender", &self.async_suspender)
.field("id", &self.id)
.finish()
}
}
impl<S, R, E> Drop for CompactFlower<S, R, E>
where
S: Send,
R: Send,
E: Send,
{
fn drop(&mut self) {}
}