1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use std::sync::atomic::{
    AtomicBool,
    Ordering::{Acquire, Release},
};
use std::thread::{self, Thread};

use conquer_util::BackOff;

use crate::cell::Block;
use crate::state::{
    AtomicOnceState,
    OnceState::{Ready, WouldBlock},
    Waiter,
};

use crate::Internal;
use crate::POISON_PANIC_MSG;

////////////////////////////////////////////////////////////////////////////////////////////////////
// ParkThread
////////////////////////////////////////////////////////////////////////////////////////////////////

/// Blocking strategy using low-level and OS reliant parking and un-parking
/// mechanisms.
#[derive(Copy, Clone, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
pub struct ParkThread;

/********** impl Internal *************************************************************************/

impl Internal for ParkThread {}

/********** impl Block ****************************************************************************/

impl Block for ParkThread {
    #[inline]
    fn block(state: &AtomicOnceState) {
        let backoff = BackOff::new();
        let head = loop {
            backoff.spin();

            let state = state.load().expect(POISON_PANIC_MSG);
            match state {
                Ready => return,
                WouldBlock(waiter) if backoff.advise_yield() => break waiter,
                _ => {}
            }
        };
        backoff.reset();

        let mut waiter = StackWaiter {
            thread: Some(thread::current()),
            ready: AtomicBool::default(),
            next: head.into(),
        };

        let mut curr = head;
        let new_head = Waiter::from(&mut waiter as *mut StackWaiter);

        while let Err(error) = state.try_swap_waiters(curr, new_head) {
            if let WouldBlock(ptr) = error {
                curr = ptr;
                waiter.next = ptr.into();
                backoff.spin();
            } else {
                return;
            }
        }

        while !waiter.ready.load(Acquire) {
            thread::park();
        }

        assert_eq!(state.load().expect(POISON_PANIC_MSG), Ready); // propagates poisoning
    }

    #[inline]
    fn unblock(waiter: Waiter) {
        let mut queue: *mut StackWaiter = waiter.into();

        unsafe {
            while let Some(curr) = queue.as_mut() {
                queue = curr.next;

                let thread = curr.thread.take().unwrap();
                // the stack waiter could be dropped right after this store!
                curr.ready.store(true, Release);

                thread.unpark();
            }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////////////////////////
// StackWaiter
////////////////////////////////////////////////////////////////////////////////////////////////////

#[repr(align(4))]
struct StackWaiter {
    thread: Option<Thread>,
    ready: AtomicBool,
    next: *mut StackWaiter,
}

/********** impl From *****************************************************************************/

impl From<*mut StackWaiter> for Waiter {
    #[inline]
    fn from(waiter: *mut StackWaiter) -> Self {
        Self(waiter as usize)
    }
}

impl From<Waiter> for *mut StackWaiter {
    #[inline]
    fn from(waiter: Waiter) -> Self {
        waiter.0 as *mut StackWaiter
    }
}