use std::fmt;
use std::io::ErrorKind;
use std::ptr;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::cancel::Cancel;
use crate::coroutine_impl::{co_cancel_data, run_coroutine, CoroutineImpl, EventSource};
use crate::scheduler::get_scheduler;
use crate::sync::atomic_dur::AtomicDuration;
use crate::sync::AtomicOption;
use crate::timeout_list::TimeoutHandle;
use crate::yield_now::{get_co_para, yield_now, yield_with};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ParkError {
Canceled,
Timeout,
}
pub struct DropGuard<'a>(&'a Park);
pub struct Park {
wait_co: Arc<AtomicOption<CoroutineImpl>>,
state: AtomicBool,
check_cancel: AtomicBool,
timeout: AtomicDuration,
timeout_handle: AtomicPtr<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>>,
wait_kernel: AtomicBool,
}
impl Default for Park {
fn default() -> Self {
Park::new()
}
}
impl Park {
pub fn new() -> Self {
Park {
wait_co: Arc::new(AtomicOption::none()),
state: AtomicBool::new(false),
check_cancel: AtomicBool::new(true),
timeout: AtomicDuration::new(None),
timeout_handle: AtomicPtr::new(ptr::null_mut()),
wait_kernel: AtomicBool::new(false),
}
}
pub fn ignore_cancel(&self, ignore: bool) {
self.check_cancel.store(!ignore, Ordering::Relaxed);
}
#[inline]
fn set_timeout_handle(
&self,
handle: Option<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>>,
) -> Option<TimeoutHandle<Arc<AtomicOption<CoroutineImpl>>>> {
let ptr = match handle {
None => ptr::null_mut(),
Some(h) => h.into_ptr(),
};
let old_ptr = self.timeout_handle.swap(ptr, Ordering::Relaxed);
if old_ptr.is_null() {
None
} else {
Some(unsafe { TimeoutHandle::from_ptr(old_ptr) })
}
}
#[inline]
fn check_park(&self) -> bool {
if self.state.load(Ordering::Acquire) {
self.state.store(false, Ordering::Release);
return false;
}
!self.state.swap(false, Ordering::AcqRel)
}
#[inline]
pub(crate) fn unpark_impl(&self, b_sync: bool) {
if !self.state.swap(true, Ordering::AcqRel) {
self.wake_up(b_sync);
}
}
#[inline]
pub fn unpark(&self) {
self.unpark_impl(false);
}
#[inline]
fn remove_timeout_handle(&self) {
if let Some(h) = self.set_timeout_handle(None) {
if h.is_link() {
get_scheduler().del_timer(h);
}
}
}
#[inline]
fn wake_up(&self, b_sync: bool) {
if let Some(co) = self.wait_co.take() {
if b_sync {
run_coroutine(co);
} else {
get_scheduler().schedule(co);
}
}
}
#[inline]
fn fast_wake_up(&self) {
if let Some(co) = self.wait_co.take() {
run_coroutine(co);
}
}
pub fn park_timeout(&self, dur: Option<Duration>) -> Result<(), ParkError> {
if !self.check_park() {
return Ok(());
}
while self.wait_kernel.load(Ordering::Acquire) {
yield_now();
}
self.timeout.store(dur);
yield_with(self);
self.check_park();
self.remove_timeout_handle();
if let Some(err) = get_co_para() {
match err.kind() {
ErrorKind::TimedOut => return Err(ParkError::Timeout),
ErrorKind::Other => return Err(ParkError::Canceled),
_ => unreachable!("unexpected return error kind"),
}
}
Ok(())
}
fn delay_drop(&self) -> DropGuard {
self.wait_kernel.store(true, Ordering::Release);
DropGuard(self)
}
}
impl Drop for DropGuard<'_> {
fn drop(&mut self) {
self.0.wait_kernel.store(false, Ordering::Release);
}
}
impl Drop for Park {
fn drop(&mut self) {
while self.wait_kernel.load(Ordering::Acquire) {
yield_now();
}
self.set_timeout_handle(None);
}
}
impl EventSource for Park {
fn subscribe(&mut self, co: CoroutineImpl) {
let cancel = co_cancel_data(&co);
let timeout_handle = self
.timeout
.take()
.map(|dur| get_scheduler().add_timer(dur, self.wait_co.clone()));
self.set_timeout_handle(timeout_handle);
let _g = self.delay_drop();
self.wait_co.store(co);
if self.state.load(Ordering::Acquire) {
return self.fast_wake_up();
}
cancel.set_co(self.wait_co.clone());
if cancel.is_canceled() {
unsafe { cancel.cancel() };
}
}
fn yield_back(&self, cancel: &'static Cancel) {
if self.check_cancel.load(Ordering::Relaxed) {
cancel.check_cancel();
}
}
}
impl fmt::Debug for Park {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Park").field("state", &self.state).finish()
}
}