#[cfg(feature = "nightly")]
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
#[cfg(not(feature = "nightly"))]
use stable::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::Instant;
use std::cell::Cell;
use std::ptr;
use std::mem;
use smallvec::SmallVec8;
use thread_parker::ThreadParker;
use word_lock::WordLock;
static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
const LOAD_FACTOR: usize = 3;
struct HashTable {
entries: Box<[Bucket]>,
hash_bits: u32,
_prev: *const HashTable,
}
impl HashTable {
fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
let bucket = Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
_padding: unsafe { mem::uninitialized() },
};
Box::new(HashTable {
entries: vec![bucket; new_size].into_boxed_slice(),
hash_bits: hash_bits,
_prev: prev,
})
}
}
struct Bucket {
mutex: WordLock,
queue_head: Cell<*const ThreadData>,
queue_tail: Cell<*const ThreadData>,
_padding: [u8; 64],
}
impl Clone for Bucket {
fn clone(&self) -> Bucket {
Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
_padding: unsafe { mem::uninitialized() },
}
}
}
struct ThreadData {
parker: ThreadParker,
key: Cell<usize>,
next_in_queue: Cell<*const ThreadData>,
}
impl ThreadData {
fn new() -> ThreadData {
let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
unsafe {
grow_hashtable(num_threads);
}
ThreadData {
parker: ThreadParker::new(),
key: Cell::new(0),
next_in_queue: Cell::new(ptr::null()),
}
}
}
impl Drop for ThreadData {
fn drop(&mut self) {
NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
}
}
unsafe fn grow_hashtable(num_threads: usize) {
if HASHTABLE.load(Ordering::Relaxed) == 0 {
let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
if HASHTABLE.compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
.is_ok() {
return;
}
Box::from_raw(new_table);
}
let mut old_table;
loop {
old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
return;
}
for b in &(*old_table).entries[..] {
b.mutex.lock();
}
if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
break;
}
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
let new_table = HashTable::new(num_threads, old_table);
for b in &(*old_table).entries[..] {
let mut current = b.queue_head.get();
while !current.is_null() {
let next = (*current).next_in_queue.get();
let hash = hash((*current).key.get(), new_table.hash_bits);
if new_table.entries[hash].queue_tail.get().is_null() {
new_table.entries[hash].queue_head.set(current);
} else {
(*new_table.entries[hash].queue_tail.get()).next_in_queue.set(current);
}
new_table.entries[hash].queue_tail.set(current);
(*current).next_in_queue.set(ptr::null());
current = next;
}
}
HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
#[cfg(target_pointer_width = "32")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B9) >> (32 - bits)
}
#[cfg(target_pointer_width = "64")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
}
unsafe fn lock_bucket<'a>(key: usize) -> Option<&'a Bucket> {
let mut bucket;
loop {
let hashtable = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
if hashtable.is_null() {
return None;
}
let hash = hash(key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
bucket.mutex.lock();
if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
return Some(bucket);
}
bucket.mutex.unlock();
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum UnparkResult {
NoParkedThreads,
UnparkedLast,
UnparkedNotLast,
}
pub unsafe fn park(key: usize,
validate: &mut FnMut() -> bool,
before_sleep: &mut FnMut(),
timeout: Option<Instant>)
-> bool {
THREAD_DATA.with(|thread_data| {
let bucket = lock_bucket(key).unwrap();
if !validate() {
bucket.mutex.unlock();
return false;
}
thread_data.next_in_queue.set(ptr::null());
thread_data.key.set(key);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
before_sleep();
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
true
}
};
if unparked {
return true;
}
let bucket = lock_bucket(key).unwrap();
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return true;
}
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
bucket.mutex.unlock();
false
})
}
pub unsafe fn unpark_one(key: usize, callback: &mut FnMut(UnparkResult)) -> UnparkResult {
let bucket = match lock_bucket(key) {
Some(x) => x,
None => {
callback(UnparkResult::NoParkedThreads);
return UnparkResult::NoParkedThreads;
}
};
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if (*current).key.get() == key {
let next = (*current).next_in_queue.get();
link.set(next);
let mut result = UnparkResult::UnparkedLast;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
let mut scan = next;
while !scan.is_null() {
if (*scan).key.get() == key {
result = UnparkResult::UnparkedNotLast;
break;
}
scan = (*scan).next_in_queue.get();
}
}
callback(result);
let lock = (*current).parker.unpark_lock();
bucket.mutex.unlock();
(*current).parker.unpark(lock);
return result;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
callback(UnparkResult::NoParkedThreads);
bucket.mutex.unlock();
UnparkResult::NoParkedThreads
}
pub unsafe fn unpark_all(key: usize) -> usize {
let bucket = match lock_bucket(key) {
Some(x) => x,
None => return 0,
};
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
let mut threads = SmallVec8::new();
while !current.is_null() {
if (*current).key.get() == key {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
threads.push((current, (*current).parker.unpark_lock()));
current = next;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
bucket.mutex.unlock();
let num_threads = threads.len();
for t in threads.into_iter() {
(*t.0).parker.unpark(t.1);
}
num_threads
}