use super::AsyncSignaler;
#[cfg(unix)]
use super::os;
use std::future::Future;
use std::io;
pub(crate) struct TokioSignaler {
inner: Inner,
}
#[cfg(target_os = "linux")]
type Inner = EventFdInner;
#[cfg(all(unix, not(target_os = "linux")))]
type Inner = PipeInner;
#[cfg(windows)]
type Inner = NotifyInner;
impl TokioSignaler {
pub(crate) fn new() -> io::Result<Self> {
Ok(Self {
inner: Inner::new()?,
})
}
}
impl AsyncSignaler for TokioSignaler {
#[inline]
fn signal(&self) {
self.inner.signal();
}
fn signaled(&self) -> impl Future<Output = ()> + Send + '_ {
self.inner.signaled()
}
}
#[cfg(target_os = "linux")]
pub(crate) struct EventFdInner {
fd: ::tokio::io::unix::AsyncFd<std::os::fd::OwnedFd>,
}
#[cfg(target_os = "linux")]
impl EventFdInner {
fn new() -> io::Result<Self> {
let raw = os::make_eventfd()?;
let fd = ::tokio::io::unix::AsyncFd::with_interest(raw, ::tokio::io::Interest::READABLE)?;
Ok(Self { fd })
}
fn signal(&self) {
use std::os::fd::AsFd;
let _ = os::signal_write_eventfd(self.fd.get_ref().as_fd());
}
async fn signaled(&self) {
use std::os::fd::AsFd;
loop {
let mut guard = match self.fd.readable().await {
Ok(g) => g,
Err(_) => return, };
match os::drain_eventfd(self.fd.get_ref().as_fd()) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
guard.clear_ready();
}
_ => return,
}
}
}
}
#[cfg(all(unix, not(target_os = "linux")))]
pub(crate) struct PipeInner {
rx: ::tokio::io::unix::AsyncFd<std::os::fd::OwnedFd>,
tx: std::sync::Arc<std::os::fd::OwnedFd>,
}
#[cfg(all(unix, not(target_os = "linux")))]
impl PipeInner {
fn new() -> io::Result<Self> {
let (r, w) = os::make_pipe()?;
let rx = ::tokio::io::unix::AsyncFd::with_interest(r, ::tokio::io::Interest::READABLE)?;
Ok(Self {
rx,
tx: std::sync::Arc::new(w),
})
}
fn signal(&self) {
use std::os::fd::AsFd;
let _ = os::signal_write_pipe(self.tx.as_fd());
}
async fn signaled(&self) {
use std::os::fd::AsFd;
loop {
let mut guard = match self.rx.readable().await {
Ok(g) => g,
Err(_) => return,
};
match os::drain_pipe(self.rx.get_ref().as_fd()) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
guard.clear_ready();
}
_ => return,
}
}
}
}
#[cfg(windows)]
pub(crate) struct NotifyInner {
notify: std::sync::Arc<::tokio::sync::Notify>,
}
#[cfg(windows)]
impl NotifyInner {
fn new() -> io::Result<Self> {
Ok(Self {
notify: std::sync::Arc::new(::tokio::sync::Notify::new()),
})
}
fn signal(&self) {
self.notify.notify_one();
}
async fn signaled(&self) {
self.notify.notified().await;
}
}