use crate::loom::sync::{Condvar, Mutex};
use std::collections::VecDeque;
use std::sync::atomic::AtomicBool;
#[cfg(loom)]
use std::sync::atomic::AtomicUsize;
#[cfg(loom)]
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::Ordering::{Acquire, Release};
use std::time::Duration;
use super::pool::Task;
#[cfg(not(loom))]
const NUM_SHARDS: usize = 16;
#[cfg(loom)]
const NUM_SHARDS: usize = 1;
struct Shard {
queue: Mutex<VecDeque<Task>>,
}
impl Shard {
fn new() -> Self {
Shard {
queue: Mutex::new(VecDeque::new()),
}
}
fn push(&self, task: Task) {
let mut queue = self.queue.lock();
while queue.len() == queue.capacity() {
let current_len = queue.len();
let new_cap = current_len.saturating_mul(2).max(4);
drop(queue);
let mut new_queue = VecDeque::with_capacity(new_cap);
queue = self.queue.lock();
if queue.len() == queue.capacity() {
if new_queue.capacity() > queue.len() {
new_queue.extend(queue.drain(..));
*queue = new_queue;
break;
}
} else {
break;
}
}
queue.push_back(task);
}
fn pop(&self) -> Option<Task> {
let mut queue = self.queue.lock();
queue.pop_front()
}
}
pub(super) struct ShardedQueue {
shards: [Shard; NUM_SHARDS],
#[cfg(loom)]
push_index: AtomicUsize,
shutdown: AtomicBool,
condvar: Condvar,
condvar_mutex: Mutex<u32>,
}
impl ShardedQueue {
pub(super) fn new() -> Self {
ShardedQueue {
shards: std::array::from_fn(|_| Shard::new()),
#[cfg(loom)]
push_index: AtomicUsize::new(0),
shutdown: AtomicBool::new(false),
condvar: Condvar::new(),
condvar_mutex: Mutex::new(0),
}
}
#[cfg(not(loom))]
fn next_push_index(&self, num_shards: usize) -> usize {
crate::runtime::context::thread_rng_n(num_shards as u32) as usize
}
#[cfg(loom)]
fn next_push_index(&self, num_shards: usize) -> usize {
self.push_index.fetch_add(1, Relaxed) & (num_shards - 1)
}
pub(super) fn push(&self, task: Task) {
let index = self.next_push_index(NUM_SHARDS);
self.shards[index].push(task);
}
pub(super) fn notify_one(&self) {
let mut guard = self.condvar_mutex.lock();
*guard += 1;
drop(guard);
self.condvar.notify_one();
}
pub(super) fn pop(&self, preferred_shard: usize) -> Option<Task> {
let start = preferred_shard % NUM_SHARDS;
for i in 0..NUM_SHARDS {
let index = (start + i) % NUM_SHARDS;
if let Some(task) = self.shards[index].pop() {
return Some(task);
}
}
None
}
pub(super) fn shutdown(&self) {
{
let _guard = self.condvar_mutex.lock();
self.shutdown.store(true, Release);
}
self.condvar.notify_all();
}
pub(super) fn is_shutdown(&self) -> bool {
self.shutdown.load(Acquire)
}
pub(super) fn wait_for_task(&self, preferred_shard: usize, timeout: Duration) -> WaitResult {
let mut guard = self.condvar_mutex.lock();
loop {
if self.is_shutdown() {
return WaitResult::Shutdown;
}
if *guard > 0 {
*guard -= 1;
drop(guard);
if let Some(task) = self.pop(preferred_shard) {
return WaitResult::Task(task);
}
guard = self.condvar_mutex.lock();
continue;
}
let (g, timeout_result) = self.condvar.wait_timeout(guard, timeout).unwrap();
guard = g;
if timeout_result.timed_out() && *guard == 0 {
if self.is_shutdown() {
return WaitResult::Shutdown;
}
return WaitResult::Timeout;
}
}
}
}
pub(super) enum WaitResult {
Task(Task),
Timeout,
Shutdown,
}