use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use std::fmt;
use std::sync::atomic::Ordering::{self, SeqCst};
pub(super) struct Idle {
state: AtomicUsize,
sleepers: Mutex<Vec<usize>>,
num_workers: usize,
}
const UNPARK_SHIFT: usize = 16;
const UNPARK_MASK: usize = !SEARCH_MASK;
const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
#[derive(Copy, Clone)]
struct State(usize);
impl Idle {
pub(super) fn new(num_workers: usize) -> Idle {
let init = State::new(num_workers);
Idle {
state: AtomicUsize::new(init.into()),
sleepers: Mutex::new(Vec::with_capacity(num_workers)),
num_workers,
}
}
pub(super) fn worker_to_notify(&self) -> Option<usize> {
if !self.notify_should_wakeup() {
return None;
}
let mut sleepers = self.sleepers.lock();
if !self.notify_should_wakeup() {
return None;
}
State::unpark_one(&self.state);
let ret = sleepers.pop();
debug_assert!(ret.is_some());
ret
}
pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
let mut sleepers = self.sleepers.lock();
let ret = State::dec_num_unparked(&self.state, is_searching);
sleepers.push(worker);
ret
}
pub(super) fn transition_worker_to_searching(&self) -> bool {
let state = State::load(&self.state, SeqCst);
if 2 * state.num_searching() >= self.num_workers {
return false;
}
State::inc_num_searching(&self.state, SeqCst);
true
}
pub(super) fn transition_worker_from_searching(&self) -> bool {
State::dec_num_searching(&self.state)
}
pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
let mut sleepers = self.sleepers.lock();
for index in 0..sleepers.len() {
if sleepers[index] == worker_id {
sleepers.swap_remove(index);
State::unpark_one(&self.state);
return;
}
}
}
pub(super) fn is_parked(&self, worker_id: usize) -> bool {
let sleepers = self.sleepers.lock();
sleepers.contains(&worker_id)
}
fn notify_should_wakeup(&self) -> bool {
let state = State(self.state.fetch_add(0, SeqCst));
state.num_searching() == 0 && state.num_unparked() < self.num_workers
}
}
impl State {
fn new(num_workers: usize) -> State {
let ret = State(num_workers << UNPARK_SHIFT);
debug_assert_eq!(num_workers, ret.num_unparked());
debug_assert_eq!(0, ret.num_searching());
ret
}
fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
State(cell.load(ordering))
}
fn unpark_one(cell: &AtomicUsize) {
cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
}
fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
cell.fetch_add(1, ordering);
}
fn dec_num_searching(cell: &AtomicUsize) -> bool {
let state = State(cell.fetch_sub(1, SeqCst));
state.num_searching() == 1
}
fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
let mut dec = 1 << UNPARK_SHIFT;
if is_searching {
dec += 1;
}
let prev = State(cell.fetch_sub(dec, SeqCst));
is_searching && prev.num_searching() == 1
}
fn num_searching(self) -> usize {
self.0 & SEARCH_MASK
}
fn num_unparked(self) -> usize {
(self.0 & UNPARK_MASK) >> UNPARK_SHIFT
}
}
impl From<usize> for State {
fn from(src: usize) -> State {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> usize {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("worker::State")
.field("num_unparked", &self.num_unparked())
.field("num_searching", &self.num_searching())
.finish()
}
}
#[test]
fn test_state() {
assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
let state = State::new(10);
assert_eq!(10, state.num_unparked());
assert_eq!(0, state.num_searching());
}