use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use parking_lot::{Condvar, Mutex};
use crate::coroutine_impl::is_coroutine;
use crate::park::{Park, ParkError};
#[derive(Debug)]
#[allow(clippy::mutex_atomic)]
pub struct ThreadPark {
lock: Mutex<bool>,
cvar: Condvar,
}
#[allow(clippy::mutex_atomic)]
impl ThreadPark {
fn new() -> Self {
ThreadPark {
lock: Mutex::new(false),
cvar: Condvar::new(),
}
}
fn park_timeout(&self, dur: Option<Duration>) -> Result<(), ParkError> {
let mut result = Ok(());
let mut guard = self.lock.lock();
while !*guard && result.is_ok() {
match dur {
None => self.cvar.wait(&mut guard),
Some(t) => {
let t = self.cvar.wait_for(&mut guard, t);
if t.timed_out() {
result = Err(ParkError::Timeout);
}
}
};
}
*guard = false;
result
}
fn unpark(&self) -> Result<(), ParkError> {
let mut guard = self.lock.lock();
if !*guard {
*guard = true;
self.cvar.notify_one();
}
Ok(())
}
}
#[derive(Debug)]
pub enum Parker {
Coroutine(Park),
Thread(ThreadPark),
}
#[derive(Debug)]
pub struct Blocker {
parker: Parker,
}
impl Blocker {
pub fn new(ignore_cancel: bool) -> Self {
let parker = if is_coroutine() {
let park = Park::new();
park.ignore_cancel(ignore_cancel);
Parker::Coroutine(park)
} else {
let park = ThreadPark::new();
Parker::Thread(park)
};
Blocker { parker }
}
pub fn current() -> Arc<Self> {
Arc::new(Self::new(false))
}
#[inline]
pub fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
match self.parker {
Parker::Coroutine(ref co) => co.park_timeout(timeout)?,
Parker::Thread(ref t) => t.park_timeout(timeout)?,
}
Ok(())
}
#[inline]
pub fn unpark(&self) -> Result<(), ParkError> {
match self.parker {
Parker::Coroutine(ref co) => co.unpark(),
Parker::Thread(ref t) => t.unpark()?,
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct FastBlocker(Park);
impl FastBlocker {
pub fn new() -> Self {
if !is_coroutine() {
panic!("only possible to block coroutine");
}
FastBlocker(Park::new())
}
#[inline]
pub fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
self.0.park_timeout(timeout)
}
#[inline]
pub fn unpark(&self) {
self.0.unpark_impl(true)
}
}
#[derive(Debug)]
pub struct SyncBlocker {
unparked: AtomicBool,
release: AtomicBool,
blocker: Blocker,
}
impl SyncBlocker {
pub fn current() -> Arc<Self> {
let blocker = Blocker::new(true);
Arc::new(SyncBlocker {
unparked: AtomicBool::new(false),
release: AtomicBool::new(false),
blocker,
})
}
#[inline]
pub fn is_unparked(&self) -> bool {
self.unparked.load(Ordering::Acquire)
}
#[inline]
pub fn set_release(&self) {
self.release.store(true, Ordering::Release);
}
#[inline]
pub fn take_release(&self) -> bool {
self.release.swap(false, Ordering::Acquire)
}
#[inline]
pub fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
self.blocker.park(timeout)
}
#[inline]
pub fn unpark(&self) -> Result<(), ParkError> {
self.blocker.unpark()?;
self.unparked.store(true, Ordering::Release);
Ok(())
}
}