use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use super::{blocking::ThreadPark, AtomicOption};
use crate::coroutine_impl::{
co_cancel_data, is_coroutine, run_coroutine, CoroutineImpl, EventSource,
};
use crate::park::ParkError;
use crate::scheduler::get_scheduler;
use crate::yield_now::{get_co_para, yield_with};
pub struct Park {
wait_co: Arc<AtomicOption<CoroutineImpl>>,
state: AtomicBool,
container: UnsafeCell<Option<Arc<Blocker>>>,
}
impl Park {
pub fn new() -> Self {
Park {
wait_co: Arc::new(AtomicOption::none()),
state: AtomicBool::new(false),
container: UnsafeCell::new(None),
}
}
pub fn park(&self, container: Arc<Blocker>) -> Result<(), ParkError> {
if self.state.load(Ordering::Acquire) {
return Ok(());
}
unsafe { *self.container.get() = Some(container) };
yield_with(self);
if let Some(err) = get_co_para() {
match err.kind() {
std::io::ErrorKind::Other => return Err(ParkError::Canceled),
std::io::ErrorKind::TimedOut => return Err(ParkError::Timeout),
_ => unreachable!("unexpected return error kind"),
}
}
Ok(())
}
#[inline]
pub fn unpark(&self) {
self.state.store(true, Ordering::Release);
if let Some(co) = self.wait_co.take() {
get_scheduler().schedule(co);
}
}
}
impl EventSource for Park {
fn subscribe(&mut self, co: CoroutineImpl) {
let cancel = co_cancel_data(&co);
let _container = self.container.get_mut().take().unwrap();
self.wait_co.store(co);
if self.state.load(Ordering::Acquire) {
if !self.wait_co.is_none() {
if let Some(co) = self.wait_co.take() {
run_coroutine(co);
}
}
return;
}
cancel.set_co(self.wait_co.clone());
if cancel.is_canceled() {
unsafe { cancel.cancel() };
}
}
}
pub enum Blocker {
Coroutine(Park),
Thread(ThreadPark),
}
impl Blocker {
pub fn new() -> Arc<Self> {
let blocker = if is_coroutine() {
Blocker::Coroutine(Park::new())
} else {
Blocker::Thread(ThreadPark::new())
};
Arc::new(blocker)
}
#[inline]
pub fn park(self: &Arc<Self>) -> Result<(), ParkError> {
match self.as_ref() {
Blocker::Coroutine(ref co) => co.park(self.clone()),
Blocker::Thread(ref t) => t.park_timeout(None),
}
}
#[inline]
pub fn unpark(&self) {
match self {
Blocker::Coroutine(ref co) => co.unpark(),
Blocker::Thread(ref t) => t.unpark(),
}
}
}