use crate::primitive::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use crate::primitive::sync::{Arc, Condvar, Mutex};
use std::fmt;
use std::marker::PhantomData;
use std::time::{Duration, Instant};
pub struct Parker {
unparker: Unparker,
_marker: PhantomData<*const ()>,
}
unsafe impl Send for Parker {}
impl Default for Parker {
fn default() -> Self {
Self {
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
_marker: PhantomData,
}
}
}
impl Parker {
pub fn new() -> Parker {
Self::default()
}
pub fn park(&self) {
self.unparker.inner.park(None);
}
pub fn park_timeout(&self, timeout: Duration) {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.park_deadline(deadline),
None => self.park(),
}
}
pub fn park_deadline(&self, deadline: Instant) {
self.unparker.inner.park(Some(deadline))
}
pub fn unparker(&self) -> &Unparker {
&self.unparker
}
pub fn into_raw(this: Parker) -> *const () {
Unparker::into_raw(this.unparker)
}
pub unsafe fn from_raw(ptr: *const ()) -> Parker {
Parker {
unparker: Unparker::from_raw(ptr),
_marker: PhantomData,
}
}
}
impl fmt::Debug for Parker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Parker { .. }")
}
}
pub struct Unparker {
inner: Arc<Inner>,
}
unsafe impl Send for Unparker {}
unsafe impl Sync for Unparker {}
impl Unparker {
pub fn unpark(&self) {
self.inner.unpark()
}
pub fn into_raw(this: Unparker) -> *const () {
Arc::into_raw(this.inner).cast::<()>()
}
pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
Unparker {
inner: Arc::from_raw(ptr.cast::<Inner>()),
}
}
}
impl fmt::Debug for Unparker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Unparker { .. }")
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, deadline: Option<Instant>) {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
if let Some(deadline) = deadline {
if deadline <= Instant::now() {
return;
}
}
let mut m = self.lock.lock().unwrap();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
loop {
m = match deadline {
None => self.cvar.wait(m).unwrap(),
Some(deadline) => {
let now = Instant::now();
if now < deadline {
self.cvar.wait_timeout(m, deadline - now).unwrap().0
} else {
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED | PARKED => return,
n => panic!("inconsistent park_timeout state: {}", n),
};
}
}
};
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
}
}
pub(crate) fn unpark(&self) {
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, NOTIFIED => return, PARKED => {} _ => panic!("inconsistent state in unpark"),
}
drop(self.lock.lock().unwrap());
self.cvar.notify_one();
}
}