use std::sync::{
atomic::{AtomicPtr, Ordering},
Arc,
};
use crossbeam_channel::{unbounded, Receiver, Sender};
use wg::WaitGroup;
#[derive(Debug)]
struct Canceler {
tx: AtomicPtr<()>,
}
impl Canceler {
#[inline]
fn cancel(&self) {
let tx_ptr = self.tx.swap(std::ptr::null_mut(), Ordering::AcqRel);
if !tx_ptr.is_null() {
unsafe {
let _ = Box::from_raw(tx_ptr as *mut Sender<()>);
}
}
}
}
impl Drop for Canceler {
fn drop(&mut self) {
self.cancel();
}
}
#[derive(Debug)]
#[repr(transparent)]
struct CancelContext {
rx: Receiver<()>,
}
impl CancelContext {
fn new() -> (Self, Canceler) {
let (tx, rx) = unbounded();
(
Self { rx },
Canceler {
tx: AtomicPtr::new(Box::into_raw(Box::new(tx)) as _),
},
)
}
#[inline]
fn done(&self) -> Receiver<()> {
self.rx.clone()
}
}
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct Closer {
inner: Arc<CloserInner>,
}
#[derive(Debug)]
struct CloserInner {
wg: WaitGroup,
ctx: CancelContext,
cancel: Canceler,
}
impl CloserInner {
#[inline]
fn new() -> Self {
let (ctx, cancel) = CancelContext::new();
Self {
wg: WaitGroup::new(),
ctx,
cancel,
}
}
#[inline]
fn with(initial: usize) -> Self {
let (ctx, cancel) = CancelContext::new();
Self {
wg: WaitGroup::from(initial),
ctx,
cancel,
}
}
}
impl Default for Closer {
fn default() -> Self {
Self {
inner: Arc::new(CloserInner::new()),
}
}
}
impl Closer {
#[inline]
pub fn new(initial: usize) -> Self {
Self {
inner: Arc::new(CloserInner::with(initial)),
}
}
#[inline]
pub fn add_running(&self, running: usize) {
self.inner.wg.add(running);
}
#[inline]
pub fn done(&self) {
self.inner.wg.done();
}
#[inline]
pub fn signal(&self) {
self.inner.cancel.cancel();
}
#[inline]
pub fn wait(&self) {
self.inner.wg.wait();
}
#[inline]
pub fn signal_and_wait(&self) {
self.signal();
self.wait();
}
pub fn listen(&self) -> Receiver<()> {
self.inner.ctx.done()
}
}