use pool::Pool;
use task::{BlockingState, Task};
use futures::{Async, Poll};
use std::cell::UnsafeCell;
use std::fmt;
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;
use std::thread;
#[derive(Debug)]
pub(crate) struct Blocking {
state: AtomicUsize,
tail: UnsafeCell<*mut Task>,
stub: Box<Task>,
lock: AtomicUsize,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum CanBlock {
Allocated,
CanRequest,
NoCapacity,
}
#[derive(Copy, Clone, Eq, PartialEq)]
struct State(usize);
const NUM_FLAG: usize = 1;
const NUM_SHIFT: usize = 1;
impl Blocking {
pub fn new(capacity: usize) -> Blocking {
assert!(capacity > 0, "blocking capacity must be greater than zero");
let stub = Box::new(Task::stub());
let ptr = &*stub as *const _ as *mut _;
debug_assert!(ptr as usize & NUM_FLAG == 0);
let init = State::new(capacity);
Blocking {
state: AtomicUsize::new(init.into()),
tail: UnsafeCell::new(ptr),
stub: stub,
lock: AtomicUsize::new(0),
}
}
pub fn poll_blocking_capacity(&self, task: &Arc<Task>) -> Poll<(), ::BlockingError> {
debug_assert!(!BlockingState::from(task.blocking.load(Acquire)).is_queued());
let mut strong: Option<*const Task> = None;
let mut curr: State = self.state.load(Acquire).into();
loop {
let mut next = curr;
if !next.claim_capacity(&self.stub) {
debug_assert!(curr.ptr().is_some());
if strong.is_none() {
let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
debug_assert!(!prev.is_queued());
strong = Some(Arc::into_raw(task.clone()));
task.next_blocking.store(ptr::null_mut(), Relaxed);
}
let ptr = strong.unwrap();
next.set_ptr(ptr);
}
debug_assert_ne!(curr.0, 0);
debug_assert_ne!(next.0, 0);
let actual = self
.state
.compare_and_swap(curr.into(), next.into(), AcqRel)
.into();
if curr == actual {
break;
}
curr = actual;
}
match curr.ptr() {
Some(prev) => {
let ptr = strong.unwrap();
unsafe {
(*prev).next_blocking.store(ptr as *mut _, Release);
}
Ok(Async::NotReady)
}
None => {
debug_assert!(curr.remaining_capacity() > 0);
if let Some(ptr) = strong {
let _ = unsafe { Arc::from_raw(ptr) };
let prev = BlockingState::toggle_queued(&task.blocking, Relaxed);
debug_assert!(prev.is_queued());
}
Ok(().into())
}
}
}
unsafe fn push_stub(&self) {
let task: *mut Task = &*self.stub as *const _ as *mut _;
(*task).next_blocking.store(ptr::null_mut(), Relaxed);
let prev = self.state.swap(task as usize, AcqRel);
debug_assert!(State::from(prev).is_ptr());
let prev = prev as *const Task;
debug_assert_ne!(prev, task);
(*prev).next_blocking.store(task, Release);
}
pub fn notify_task(&self, pool: &Arc<Pool>) {
let prev = self.lock.fetch_add(1, AcqRel);
if prev != 0 {
return;
}
let mut dec = 1;
loop {
let mut remaining_pops = dec;
while remaining_pops > 0 {
remaining_pops -= 1;
let task = match self.pop(remaining_pops) {
Some(t) => t,
None => break,
};
Task::notify_blocking(task, pool);
}
let actual = self.lock.fetch_sub(dec, AcqRel);
if actual == dec {
break;
}
debug_assert!(actual > dec);
dec = actual - dec;
}
}
fn pop(&self, rem: usize) -> Option<Arc<Task>> {
'outer: loop {
unsafe {
let mut tail = *self.tail.get();
let mut next = (*tail).next_blocking.load(Acquire);
let stub = &*self.stub as *const _ as *mut _;
if tail == stub {
if next.is_null() {
let mut curr: State = self.state.load(Acquire).into();
loop {
if curr.has_task(&self.stub) {
thread::yield_now();
continue 'outer;
}
let mut after = curr;
after.add_capacity(rem + 1, &self.stub);
let actual: State = self
.state
.compare_and_swap(curr.into(), after.into(), AcqRel)
.into();
if actual == curr {
return None;
}
curr = actual;
}
}
*self.tail.get() = next;
tail = next;
next = (*next).next_blocking.load(Acquire);
}
if !next.is_null() {
*self.tail.get() = next;
return Some(Arc::from_raw(tail));
}
let state = self.state.load(Acquire);
debug_assert!(State::from(state).is_ptr());
if state != tail as usize {
thread::yield_now();
continue 'outer;
}
self.push_stub();
next = (*tail).next_blocking.load(Acquire);
if !next.is_null() {
*self.tail.get() = next;
return Some(Arc::from_raw(tail));
}
thread::yield_now();
}
}
}
}
impl State {
fn new(capacity: usize) -> State {
State((capacity << NUM_SHIFT) | NUM_FLAG)
}
fn remaining_capacity(&self) -> usize {
if !self.has_remaining_capacity() {
return 0;
}
self.0 >> 1
}
fn has_remaining_capacity(&self) -> bool {
self.0 & NUM_FLAG == NUM_FLAG
}
fn has_task(&self, stub: &Task) -> bool {
!(self.has_remaining_capacity() || self.is_stub(stub))
}
fn is_stub(&self, stub: &Task) -> bool {
self.0 == stub as *const _ as usize
}
fn claim_capacity(&mut self, stub: &Task) -> bool {
if !self.has_remaining_capacity() {
return false;
}
debug_assert!(self.0 != 1);
self.0 -= 1 << NUM_SHIFT;
if self.0 == NUM_FLAG {
self.0 = stub as *const _ as usize;
}
true
}
fn add_capacity(&mut self, capacity: usize, stub: &Task) -> bool {
debug_assert!(capacity > 0);
if self.is_stub(stub) {
self.0 = (capacity << NUM_SHIFT) | NUM_FLAG;
true
} else if self.has_remaining_capacity() {
self.0 += capacity << NUM_SHIFT;
true
} else {
false
}
}
fn is_ptr(&self) -> bool {
self.0 & NUM_FLAG == 0
}
fn ptr(&self) -> Option<*const Task> {
if self.is_ptr() {
Some(self.0 as *const Task)
} else {
None
}
}
fn set_ptr(&mut self, ptr: *const Task) {
let ptr = ptr as usize;
debug_assert!(ptr & NUM_FLAG == 0);
self.0 = ptr
}
}
impl From<usize> for State {
fn from(src: usize) -> State {
State(src)
}
}
impl From<State> for usize {
fn from(src: State) -> usize {
src.0
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let mut fmt = fmt.debug_struct("State");
if self.is_ptr() {
fmt.field("ptr", &self.0);
} else {
fmt.field("remaining", &self.remaining_capacity());
}
fmt.finish()
}
}