use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::driver::{self, Driver};
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
pub(crate) struct Parker {
inner: Arc<Inner>,
}
pub(crate) struct Unparker {
inner: Arc<Inner>,
}
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
shared: Arc<Shared>,
}
const EMPTY: usize = 0;
const PARKED_CONDVAR: usize = 1;
const PARKED_DRIVER: usize = 2;
const NOTIFIED: usize = 3;
struct Shared {
driver: TryLock<Driver>,
}
impl Parker {
pub(crate) fn new(driver: Driver) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: Arc::new(Shared {
driver: TryLock::new(driver),
}),
}),
}
}
pub(crate) fn unpark(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.inner.park(handle);
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
assert_eq!(duration, Duration::from_millis(0));
if let Some(mut driver) = self.inner.shared.driver.try_lock() {
driver.park_timeout(handle, duration)
}
}
pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
self.inner.shutdown(handle);
}
}
impl Clone for Parker {
fn clone(&self) -> Parker {
Parker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
shared: self.inner.shared.clone(),
}),
}
}
}
impl Unparker {
pub(crate) fn unpark(&self, driver: &driver::Handle) {
self.inner.unpark(driver);
}
}
impl Inner {
fn park(&self, handle: &driver::Handle) {
for _ in 0..3 {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
thread::yield_now();
}
if let Some(mut driver) = self.shared.driver.try_lock() {
self.park_driver(&mut driver, handle);
} else {
self.park_condvar();
}
}
fn park_condvar(&self) {
let mut m = self.mutex.lock();
match self
.state
.compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
}
}
fn park_driver(&self, driver: &mut Driver, handle: &driver::Handle) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
{
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
driver.park(handle);
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} PARKED_DRIVER => {} n => panic!("inconsistent park_timeout state: {}", n),
}
}
fn unpark(&self, driver: &driver::Handle) {
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => {} NOTIFIED => {} PARKED_CONDVAR => self.unpark_condvar(),
PARKED_DRIVER => driver.unpark(),
actual => panic!("inconsistent state in unpark; actual = {}", actual),
}
}
fn unpark_condvar(&self) {
drop(self.mutex.lock());
self.condvar.notify_one()
}
fn shutdown(&self, handle: &driver::Handle) {
if let Some(mut driver) = self.shared.driver.try_lock() {
driver.shutdown(handle);
}
self.condvar.notify_all();
}
}