pub mod pair;
use std::{
collections::HashMap,
future::Future,
num::NonZeroUsize,
pin::Pin,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
sync::Arc,
task::{Context, Poll, Waker}
};
use parking_lot::Mutex;
struct State {
waiting: HashMap<usize, Waker>,
final_wait: HashMap<usize, Waker>
}
struct Shared {
id: AtomicUsize,
triggered: AtomicBool,
state: Mutex<State>
}
impl Shared {
#[inline]
fn id(&self) -> usize {
self.id.fetch_add(1, Ordering::SeqCst)
}
}
pub struct KillSwitch(Arc<Shared>);
impl KillSwitch {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn trigger(&self) {
self.0.triggered.store(true, Ordering::SeqCst);
let mut state = self.0.state.lock();
for (_, waker) in state.waiting.drain() {
waker.wake();
}
}
#[inline]
pub fn wait(&self) -> WaitFuture {
WaitFuture {
ctx: Arc::clone(&self.0),
id: None
}
}
pub fn finalize(&self) -> FinalizedFuture {
FinalizedFuture {
ctx: Arc::clone(&self.0),
id: None
}
}
pub fn reset(&self) {
self.0.triggered.store(false, Ordering::SeqCst);
}
pub async fn finalize_reset(&self) -> Result<(), ()> {
if (self.finalize().await).is_ok() {
self.reset();
Ok(())
} else {
Err(())
}
}
pub fn is_triggered(&self) -> bool {
self.0.triggered.load(Ordering::SeqCst)
}
}
impl Default for KillSwitch {
fn default() -> Self {
Self(Arc::new(Shared {
id: AtomicUsize::new(1),
triggered: AtomicBool::new(false),
state: Mutex::new(State {
waiting: HashMap::new(),
final_wait: HashMap::new()
})
}))
}
}
impl Clone for KillSwitch {
fn clone(&self) -> KillSwitch {
KillSwitch(Arc::clone(&self.0))
}
}
pub struct WaitFuture {
ctx: Arc<Shared>,
id: Option<NonZeroUsize>
}
impl Future for WaitFuture {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
match self.ctx.triggered.load(Ordering::SeqCst) {
true => Poll::Ready(()),
false => {
let mut state = self.ctx.state.lock();
if self.ctx.triggered.load(Ordering::SeqCst) {
return Poll::Ready(());
}
let mut id = self.ctx.id();
while id == 0 || state.waiting.contains_key(&id) {
id = self.ctx.id();
}
state.waiting.insert(id, ctx.waker().clone());
drop(state);
self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
Poll::Pending
}
}
}
}
impl Drop for WaitFuture {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut state = self.ctx.state.lock();
state.waiting.remove(&id.get());
if state.waiting.is_empty() {
for (_, waker) in state.final_wait.drain() {
waker.wake();
}
}
}
}
}
pub struct FinalizedFuture {
ctx: Arc<Shared>,
id: Option<NonZeroUsize>
}
impl Future for FinalizedFuture {
type Output = Result<(), ()>;
fn poll(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>
) -> Poll<Self::Output> {
if !self.ctx.triggered.load(Ordering::SeqCst) {
Poll::Ready(Err(()))
} else {
let mut state = self.ctx.state.lock();
if state.waiting.is_empty() {
Poll::Ready(Ok(()))
} else {
let mut id = self.ctx.id();
while id == 0 || state.waiting.contains_key(&id) {
id = self.ctx.id();
}
state.final_wait.insert(id, ctx.waker().clone());
drop(state);
self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
Poll::Pending
}
}
}
}
impl Drop for FinalizedFuture {
fn drop(&mut self) {
if let Some(id) = self.id {
let mut state = self.ctx.state.lock();
state.final_wait.remove(&id.get());
}
}
}