use crate::Signal;
use atomic_waker::AtomicWaker;
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct Notifier {
pipe: Arc<Pipe>,
}
impl Notifier {
pub(super) fn new() -> io::Result<Self> {
Ok(Self {
pipe: Arc::new(Pipe {
count: AtomicUsize::new(0),
waker: AtomicWaker::new(),
}),
})
}
pub(super) fn add_signal(
&mut self,
_signal: Signal,
) -> io::Result<impl Fn() + Send + Sync + 'static> {
let pipe = self.pipe.clone();
Ok(move || {
pipe.push();
})
}
pub(super) fn remove_signal(&mut self, _signal: Signal) -> io::Result<()> {
Ok(())
}
pub(super) fn poll_next(&self, cx: &mut Context<'_>) -> Poll<io::Result<Signal>> {
let mut count = self.pipe.count.load(Ordering::SeqCst);
let mut registered = false;
loop {
if count > 0 {
let new_count = count - 1;
if let Err(new_count) = self.pipe.count.compare_exchange(
count,
new_count,
Ordering::SeqCst,
Ordering::SeqCst,
) {
count = new_count;
continue;
}
return Poll::Ready(Ok(Signal::Int));
}
if registered {
return Poll::Pending;
} else {
registered = true;
self.pipe.waker.register(cx.waker());
count = self.pipe.count.load(Ordering::SeqCst);
}
}
}
}
#[derive(Debug)]
struct Pipe {
count: AtomicUsize,
waker: AtomicWaker,
}
impl Pipe {
fn push(&self) {
self.count.fetch_add(1, Ordering::SeqCst);
self.waker.wake();
}
}