use alloc::{sync::Arc, task::Wake};
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use parking_lot::{Condvar, Mutex};
use crate::thread_pool::{ThreadPool, WorkerThread};
pub trait Latch {
unsafe fn set(this: *const Self);
}
pub trait Probe {
fn probe(&self) -> bool;
}
pub struct AtomicLatch {
state: AtomicBool,
}
impl AtomicLatch {
#[inline]
pub const fn new() -> Self {
Self {
state: AtomicBool::new(false),
}
}
#[inline]
pub fn reset(&self) {
self.state.store(false, Ordering::Release);
}
}
impl Default for AtomicLatch {
fn default() -> Self {
Self::new()
}
}
impl Latch for AtomicLatch {
#[inline]
unsafe fn set(this: *const Self) {
unsafe {
(*this).state.store(true, Ordering::Release);
}
}
}
impl Probe for AtomicLatch {
#[inline]
fn probe(&self) -> bool {
self.state.load(Ordering::Acquire)
}
}
pub struct WakeLatch {
atomic_latch: AtomicLatch,
thread_pool: &'static ThreadPool,
thread_index: usize,
}
impl WakeLatch {
#[inline]
pub fn new(worker_thread: &WorkerThread) -> WakeLatch {
WakeLatch {
atomic_latch: AtomicLatch::new(),
thread_pool: worker_thread.thread_pool(),
thread_index: worker_thread.index(),
}
}
#[inline]
pub const fn new_raw(thread_index: usize, thread_pool: &'static ThreadPool) -> WakeLatch {
WakeLatch {
atomic_latch: AtomicLatch::new(),
thread_pool,
thread_index,
}
}
#[inline]
pub fn reset(&self) {
self.atomic_latch.reset();
}
}
impl Latch for WakeLatch {
#[inline]
unsafe fn set(this: *const Self) {
unsafe {
let thread_pool = (*this).thread_pool;
let thread_index = (*this).thread_index;
Latch::set(&(*this).atomic_latch);
thread_pool.wake_thread(thread_index);
}
}
}
impl Probe for WakeLatch {
#[inline]
fn probe(&self) -> bool {
self.atomic_latch.probe()
}
}
pub struct LockLatch {
mutex: Mutex<bool>,
cond: Condvar,
}
impl LockLatch {
#[inline]
pub const fn new() -> LockLatch {
LockLatch {
mutex: Mutex::new(false),
cond: Condvar::new(),
}
}
pub fn wait(&self) {
let mut guard = self.mutex.lock();
while !*guard {
self.cond.wait(&mut guard);
}
}
pub fn wait_and_reset(&self) {
let mut guard = self.mutex.lock();
while !*guard {
self.cond.wait(&mut guard);
}
*guard = false;
}
}
impl Default for LockLatch {
fn default() -> Self {
Self::new()
}
}
impl Latch for LockLatch {
#[inline]
unsafe fn set(this: *const Self) {
unsafe {
let mut guard = (*this).mutex.lock();
*guard = true;
(*this).cond.notify_all();
}
}
}
pub struct CountLatch {
counter: AtomicUsize,
latch: WakeLatch,
}
impl CountLatch {
#[inline]
pub fn with_count(count: usize, owner: &WorkerThread) -> Self {
Self {
counter: AtomicUsize::new(count),
latch: WakeLatch::new(owner),
}
}
#[inline]
pub fn increment(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
}
impl Latch for CountLatch {
#[inline]
unsafe fn set(this: *const Self) {
unsafe {
if (*this).counter.fetch_sub(1, Ordering::SeqCst) == 1 {
Latch::set(&(*this).latch);
}
}
}
}
impl Probe for CountLatch {
#[inline]
fn probe(&self) -> bool {
self.latch.probe()
}
}
pub struct SetOnWake<L>
where
L: Latch,
{
latch: L,
}
impl<L> SetOnWake<L>
where
L: Latch,
{
pub fn new(latch: L) -> Arc<Self> {
Arc::new(Self { latch })
}
pub fn latch(&self) -> &L {
&self.latch
}
}
impl<L> Wake for SetOnWake<L>
where
L: Latch,
{
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
unsafe { Latch::set(&self.latch) };
}
}