use std::{os::unix::io::RawFd, sync::Arc};
use nix::sys::eventfd::{eventfd, EfdFlags};
use nix::unistd::{read, write};
use super::{CloseOnDrop, PingError};
use crate::{
generic::Generic, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};
const INCREMENT_PING: u64 = 0x2;
const INCREMENT_CLOSE: u64 = 0x1;
#[inline]
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
let read = eventfd(0, EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)?;
let fd_arc = Arc::new(CloseOnDrop(read));
let ping = Ping {
event: Arc::new(FlagOnDrop(Arc::clone(&fd_arc))),
};
let source = PingSource {
event: Generic::new(read, Interest::READ, Mode::Level),
_fd: fd_arc,
};
Ok((ping, source))
}
#[inline]
fn send_ping(fd: RawFd, count: u64) -> std::io::Result<()> {
assert!(count > 0);
match write(fd, &count.to_ne_bytes()) {
Ok(_) => Ok(()),
Err(nix::errno::Errno::EAGAIN) => Ok(()),
Err(e) => Err(e.into()),
}
}
#[inline]
fn drain_ping(fd: RawFd) -> std::io::Result<u64> {
const NBYTES: usize = 8;
let mut buf = [0u8; NBYTES];
match read(fd, &mut buf) {
Ok(NBYTES) => Ok(u64::from_ne_bytes(buf)),
Ok(_) => unreachable!(),
Err(e) => Err(e.into()),
}
}
#[derive(Debug)]
pub struct PingSource {
event: Generic<RawFd>,
_fd: Arc<CloseOnDrop>,
}
impl EventSource for PingSource {
type Event = ();
type Metadata = ();
type Ret = ();
type Error = PingError;
fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.event
.process_events(readiness, token, |_, &mut fd| {
let counter = drain_ping(fd)?;
let close = (counter & INCREMENT_CLOSE) != 0;
let ping = (counter & (u64::MAX - 1)) != 0;
if ping {
callback((), &mut ());
}
if close {
Ok(PostAction::Remove)
} else {
Ok(PostAction::Continue)
}
})
.map_err(|e| PingError(e.into()))
}
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.event.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.event.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.event.unregister(poll)
}
}
#[derive(Clone, Debug)]
pub struct Ping {
event: Arc<FlagOnDrop>,
}
impl Ping {
pub fn ping(&self) {
if let Err(e) = send_ping(self.event.0 .0, INCREMENT_PING) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}
#[derive(Debug)]
struct FlagOnDrop(Arc<CloseOnDrop>);
impl Drop for FlagOnDrop {
fn drop(&mut self) {
if let Err(e) = send_ping(self.0 .0, INCREMENT_CLOSE) {
log::warn!("[calloop] Failed to send close ping: {:?}", e);
}
}
}