use std::{os::unix::io::RawFd, sync::Arc};
use nix::{
fcntl::OFlag,
unistd::{close, pipe2, read, write},
};
use super::generic::{Fd, Generic};
use crate::{
no_nix_err, EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};
pub fn make_ping() -> std::io::Result<(Ping, PingSource)> {
let (read, write) = pipe2(OFlag::O_CLOEXEC | OFlag::O_NONBLOCK).map_err(no_nix_err)?;
let source = PingSource {
pipe: Generic::from_fd(read, Interest::READ, Mode::Level),
};
let ping = Ping {
pipe: Arc::new(CloseOnDrop(write)),
};
Ok((ping, source))
}
#[derive(Debug)]
pub struct PingSource {
pipe: Generic<Fd>,
}
impl EventSource for PingSource {
type Event = ();
type Metadata = ();
type Ret = ();
fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> std::io::Result<PostAction>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
self.pipe.process_events(readiness, token, |_, fd| {
let mut buf = [0u8; 32];
let mut read_something = false;
let mut action = PostAction::Continue;
loop {
match read(fd.0, &mut buf) {
Ok(0) => {
action = PostAction::Remove;
break;
}
Ok(_) => read_something = true,
Err(e) => {
let e = no_nix_err(e);
if e.kind() == std::io::ErrorKind::WouldBlock {
break;
} else {
return Err(e);
}
}
}
}
if read_something {
callback((), &mut ());
}
Ok(action)
})
}
fn register(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> std::io::Result<()> {
self.pipe.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> std::io::Result<()> {
self.pipe.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> std::io::Result<()> {
self.pipe.unregister(poll)
}
}
impl Drop for PingSource {
fn drop(&mut self) {
if let Err(e) = close(self.pipe.file.0) {
log::warn!("[calloop] Failed to close read ping: {:?}", e);
}
}
}
#[derive(Clone, Debug)]
pub struct Ping {
pipe: Arc<CloseOnDrop>,
}
impl Ping {
pub fn ping(&self) {
if let Err(e) = write(self.pipe.0, &[0u8]) {
log::warn!("[calloop] Failed to write a ping: {:?}", e);
}
}
}
#[derive(Debug)]
struct CloseOnDrop(RawFd);
impl Drop for CloseOnDrop {
fn drop(&mut self) {
if let Err(e) = close(self.0) {
log::warn!("[calloop] Failed to close write ping: {:?}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ping() {
let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();
let (ping, source) = make_ping().unwrap();
event_loop
.handle()
.insert_source(source, |(), &mut (), dispatched| *dispatched = true)
.unwrap();
ping.ping();
let mut dispatched = false;
event_loop
.dispatch(std::time::Duration::from_millis(0), &mut dispatched)
.unwrap();
assert!(dispatched);
let mut dispatched = false;
event_loop
.dispatch(std::time::Duration::from_millis(0), &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn ping_closed() {
let mut event_loop = crate::EventLoop::<bool>::try_new().unwrap();
let (_, source) = make_ping().unwrap();
event_loop
.handle()
.insert_source(source, |(), &mut (), dispatched| *dispatched = true)
.unwrap();
let mut dispatched = false;
event_loop
.dispatch(std::time::Duration::from_millis(0), &mut dispatched)
.unwrap();
assert!(!dispatched);
let now = std::time::Instant::now();
event_loop
.dispatch(std::time::Duration::from_millis(100), &mut dispatched)
.unwrap();
assert!(now.elapsed() >= std::time::Duration::from_millis(100));
}
}