use io_uring::{opcode, types};
use libc::{eventfd, EFD_NONBLOCK, EFD_SEMAPHORE};
use std::{
ffi::c_void,
future::Future,
io::{ErrorKind, Result},
os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd},
pin::Pin,
task::{Context, Poll},
};
use crate::reactor::{Reactor, ReactorIo};
pub struct Event {
inner: OwnedFd,
}
pub struct EventWaiter<'fd> {
inner: BorrowedFd<'fd>,
io: ReactorIo,
wait_buf: [u8; std::mem::size_of::<u64>()],
}
impl Event {
pub fn new() -> Result<Self> {
let fd = unsafe { eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE) };
if fd == -1 {
Err(std::io::Error::last_os_error())?;
}
Ok(Self {
inner: unsafe { OwnedFd::from_raw_fd(fd) },
})
}
pub fn notify_one(&self) -> Result<()> {
let buffer = 1_u64.to_ne_bytes();
let ret = unsafe {
libc::write(
self.inner.as_raw_fd(),
buffer.as_ptr() as *const c_void,
buffer.len(),
)
};
if ret == -1 {
Err(std::io::Error::last_os_error())?
}
if ret as usize != buffer.len() {
return Err(std::io::Error::new(
ErrorKind::UnexpectedEof,
"Failed to write entire event fd buffer",
));
}
Ok(())
}
pub fn wait(&mut self) -> EventWaiter<'_> {
EventWaiter {
inner: self.inner.as_fd(),
io: Reactor::new_io(),
wait_buf: [0; std::mem::size_of::<u64>()],
}
}
}
impl Clone for Event {
fn clone(&self) -> Self {
Self {
inner: self.inner.try_clone().unwrap(),
}
}
}
impl Future for EventWaiter<'_> {
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
this.io
.submit_or_get_result(|| {
(
opcode::Read::new(
types::Fd(this.inner.as_raw_fd()),
this.wait_buf.as_mut_ptr(),
this.wait_buf.len() as _,
)
.build(),
ctx.waker().clone(),
)
})
.map(|x| x.map(|_| ()))
}
}
#[cfg(test)]
mod tests {
use std::thread;
use super::Event;
use crate::task::Executor;
#[test]
fn simple() {
Executor::block_on(async {
let evt = Event::new().unwrap();
let task = {
let mut evt = evt.clone();
Executor::spawn(async move {
evt.wait().await.unwrap();
})
};
evt.notify_one().unwrap();
task.await;
});
}
#[test]
fn multi_notifiers() {
Executor::block_on(async {
let evt = Event::new().unwrap();
let task = {
let mut evt = evt.clone();
Executor::spawn(async move {
let mut count = 0;
while count < 40 {
evt.wait().await.unwrap();
count += 1;
}
})
};
let t1 = {
let evt = evt.clone();
thread::spawn(move || {
for _ in 0..20 {
evt.notify_one().unwrap();
}
})
};
let t2 = {
let evt = evt.clone();
thread::spawn(move || {
for _ in 0..20 {
evt.notify_one().unwrap();
}
})
};
t1.join().unwrap();
t2.join().unwrap();
task.await;
});
}
}