#![cfg_attr(not(feature = "full"), allow(dead_code))]
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
#[derive(Debug)]
pub(crate) struct ParkThread {
inner: Arc<Inner>,
}
#[derive(Clone, Debug)]
pub(crate) struct UnparkThread {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;
tokio_thread_local! {
static CURRENT_PARKER: ParkThread = ParkThread::new();
}
#[cfg(loom)]
tokio_thread_local! {
static CURRENT_THREAD_PARK_COUNT: AtomicUsize = AtomicUsize::new(0);
}
impl ParkThread {
pub(crate) fn new() -> Self {
Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
}),
}
}
pub(crate) fn unpark(&self) -> UnparkThread {
let inner = self.inner.clone();
UnparkThread { inner }
}
pub(crate) fn park(&mut self) {
#[cfg(loom)]
CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
self.inner.park();
}
pub(crate) fn park_timeout(&mut self, duration: Duration) {
#[cfg(loom)]
CURRENT_THREAD_PARK_COUNT.with(|count| count.fetch_add(1, SeqCst));
#[cfg(not(tokio_wasm))]
self.inner.park_timeout(duration);
#[cfg(tokio_wasm)]
std::thread::sleep(duration);
}
pub(crate) fn shutdown(&mut self) {
self.inner.shutdown();
}
}
impl Inner {
fn park(&self) {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
let mut m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park state; actual = {}", actual),
}
loop {
m = self.condvar.wait(m).unwrap();
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
}
}
fn park_timeout(&self, dur: Duration) {
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
return;
}
if dur == Duration::from_millis(0) {
return;
}
let m = self.mutex.lock();
match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
let old = self.state.swap(EMPTY, SeqCst);
debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
}
Err(actual) => panic!("inconsistent park_timeout state; actual = {}", actual),
}
let (_m, _result) = self.condvar.wait_timeout(m, dur).unwrap();
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} PARKED => {} n => panic!("inconsistent park_timeout state: {}", n),
}
}
fn unpark(&self) {
match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, NOTIFIED => return, PARKED => {} _ => panic!("inconsistent state in unpark"),
}
drop(self.mutex.lock());
self.condvar.notify_one()
}
fn shutdown(&self) {
self.condvar.notify_all();
}
}
impl Default for ParkThread {
fn default() -> Self {
Self::new()
}
}
impl UnparkThread {
pub(crate) fn unpark(&self) {
self.inner.unpark();
}
}
use crate::loom::thread::AccessError;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::rc::Rc;
use std::task::{RawWaker, RawWakerVTable, Waker};
#[derive(Debug)]
pub(crate) struct CachedParkThread {
_anchor: PhantomData<Rc<()>>,
}
impl CachedParkThread {
pub(crate) fn new() -> CachedParkThread {
CachedParkThread {
_anchor: PhantomData,
}
}
pub(crate) fn waker(&self) -> Result<Waker, AccessError> {
self.unpark().map(|unpark| unpark.into_waker())
}
fn unpark(&self) -> Result<UnparkThread, AccessError> {
self.with_current(|park_thread| park_thread.unpark())
}
pub(crate) fn park(&mut self) {
self.with_current(|park_thread| park_thread.inner.park())
.unwrap();
}
pub(crate) fn park_timeout(&mut self, duration: Duration) {
self.with_current(|park_thread| park_thread.inner.park_timeout(duration))
.unwrap();
}
fn with_current<F, R>(&self, f: F) -> Result<R, AccessError>
where
F: FnOnce(&ParkThread) -> R,
{
CURRENT_PARKER.try_with(|inner| f(inner))
}
pub(crate) fn block_on<F: Future>(&mut self, f: F) -> Result<F::Output, AccessError> {
use std::task::Context;
use std::task::Poll::Ready;
let waker = self.waker()?;
let mut cx = Context::from_waker(&waker);
pin!(f);
loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}
#[cfg(feature = "rt")]
crate::runtime::context::with_defer(|defer| defer.wake());
self.park();
}
}
}
impl UnparkThread {
pub(crate) fn into_waker(self) -> Waker {
unsafe {
let raw = unparker_to_raw_waker(self.inner);
Waker::from_raw(raw)
}
}
}
impl Inner {
#[allow(clippy::wrong_self_convention)]
fn into_raw(this: Arc<Inner>) -> *const () {
Arc::into_raw(this) as *const ()
}
unsafe fn from_raw(ptr: *const ()) -> Arc<Inner> {
Arc::from_raw(ptr as *const Inner)
}
}
unsafe fn unparker_to_raw_waker(unparker: Arc<Inner>) -> RawWaker {
RawWaker::new(
Inner::into_raw(unparker),
&RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker),
)
}
unsafe fn clone(raw: *const ()) -> RawWaker {
let unparker = Inner::from_raw(raw);
mem::forget(unparker.clone());
unparker_to_raw_waker(unparker)
}
unsafe fn drop_waker(raw: *const ()) {
let _ = Inner::from_raw(raw);
}
unsafe fn wake(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
}
unsafe fn wake_by_ref(raw: *const ()) {
let unparker = Inner::from_raw(raw);
unparker.unpark();
mem::forget(unparker);
}
#[cfg(loom)]
pub(crate) fn current_thread_park_count() -> usize {
CURRENT_THREAD_PARK_COUNT.with(|count| count.load(SeqCst))
}