use std::any::Any;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::Result;
use crate::coroutine_impl::Coroutine;
use crate::std::sync::{AtomicOption, Blocker};
use crossbeam::atomic::AtomicCell;
use generator::Error;
pub struct Join {
to_wake: AtomicOption<Arc<Blocker>>,
state: AtomicBool,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
}
impl Join {
pub fn new(panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>) -> Self {
Join {
to_wake: AtomicOption::none(),
state: AtomicBool::new(true),
panic,
}
}
pub fn set_panic_data(&self, panic: Box<dyn Any + Send>) {
self.panic.swap(Some(panic));
}
pub fn trigger(&self) {
self.state.store(false, Ordering::Release);
if let Some(w) = self.to_wake.take() {
w.unpark();
}
}
fn wait(&self) {
if self.state.load(Ordering::Acquire) {
let cur = Blocker::current();
self.to_wake.swap(cur.clone());
if self.state.load(Ordering::Acquire) {
} else if let Some(w) = self.to_wake.take() {
w.unpark();
}
cur.park(None).ok();
}
}
}
pub struct JoinHandle<T> {
co: Coroutine,
join: Arc<Join>,
packet: Arc<AtomicCell<Option<T>>>,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
}
unsafe impl<T> Send for JoinHandle<T> {}
unsafe impl<T> Sync for JoinHandle<T> {}
pub fn make_join_handle<T>(
co: Coroutine,
join: Arc<Join>,
packet: Arc<AtomicCell<Option<T>>>,
panic: Arc<AtomicCell<Option<Box<dyn Any + Send>>>>,
) -> JoinHandle<T> {
JoinHandle {
co,
join,
packet,
panic,
}
}
impl<T> JoinHandle<T> {
pub fn coroutine(&self) -> &Coroutine {
&self.co
}
pub fn is_done(&self) -> bool {
!self.join.state.load(Ordering::Acquire)
}
pub fn wait(&self) {
self.join.wait();
}
pub fn join(self) -> Result<T> {
self.join.wait();
self.packet
.take()
.ok_or_else(|| self.panic.take().unwrap_or_else(|| Box::new(Error::Cancel)))
}
}
impl<T> fmt::Debug for JoinHandle<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("JoinHandle { .. }")
}
}