use std::os::fd::{IntoRawFd, RawFd};
use std::io;
use std::fs::File as StdFile;
use std::path::Path;
use std::task::Waker;
use std::task::Poll;
use std::task::Context;
use std::sync::{OnceLock, atomic::{AtomicBool, Ordering}};
use std::sync::mpsc::{channel, Sender};
use std::cell::UnsafeCell;
use std::ffi::{c_void, c_int};
use std::pin::Pin;
use std::collections::HashMap;
use log::*;
use libc::{O_ASYNC, O_NONBLOCK};
mod compat;
mod convert;
mod gen;
use crate::gen::siginfo_t;
const F_SETSIG: c_int = crate::gen::F_SETSIG as c_int;
fn c_err<T: From<i8> + PartialEq>(retval: T) -> io::Result<T> {
if retval == (-1 as i8).into() {
let err = io::Error::last_os_error();
Err(err)
} else {
Ok(retval)
}
}
#[derive(Debug)]
enum Event {
Wakeup(RawFd),
WakeAll,
Queue(RawFd, Waker),
Drop(RawFd),
}
static CHAN: OnceLock<Sender<Event>> = OnceLock::new();
static HANDLER_PID: OnceLock<libc::pid_t> = OnceLock::new();
static SIGNAL: OnceLock<c_int> = OnceLock::new();
static INIT_STARTED: AtomicBool = AtomicBool::new(false);
pub fn init() -> io::Result<()> {
if INIT_STARTED.swap(true, Ordering::Relaxed) {
return Ok(());
}
let (snd, rcv) = channel();
std::thread::spawn(move || {
let mut m: HashMap<RawFd, Vec<Waker>> = HashMap::new();
while let Ok(ev) = rcv.recv() {
trace!("got event {ev:?}");
match ev {
Event::Queue(fd, waker) => {
trace!("queuing waker on fd {fd}");
let wakers = m.entry(fd).or_default();
wakers.push(waker);
}
Event::WakeAll =>
m.iter_mut()
.for_each(|(_, v)| v.drain(..).for_each(Waker::wake)),
Event::Wakeup(fd) | Event::Drop(fd) => {
if let Some(wakers) = m.get_mut(&fd) {
trace!("waking up {} futures on fd {fd}", wakers.len());
wakers.drain(..).for_each(Waker::wake);
if let Event::Drop(_) = ev {
m.remove(&fd);
}
}
}
}
}
log::error!("sigio manager thread exited");
});
CHAN.set(snd).expect("init called more than once");
unsafe {
let signo = libc::SIGRTMIN() + 3;
let mut sset = std::mem::zeroed();
libc::sigfillset(&mut sset);
trace!("installing signal handler for signal {signo}");
let act = libc::sigaction{
sa_flags: libc::SA_SIGINFO,
sa_mask: sset.clone(),
sa_sigaction: handler as extern "C" fn(_, _, _) as libc::sighandler_t,
sa_restorer: None,
};
libc::sigaction(signo, &act, std::ptr::null_mut());
libc::sigaction(libc::SIGIO, &act, std::ptr::null_mut());
let pid = libc::getpid();
HANDLER_PID.set(pid).unwrap();
SIGNAL.set(signo).unwrap();
}
Ok(())
}
extern fn handler(signal: c_int, info: &siginfo_t, _: *const ()) {
let fd = info.si_fd();
let code = info.si_code;
log::trace!("got signal {signal} for fd {fd} (si_code {code})");
let ev = if code == 0x80 {
Event::WakeAll
} else {
Event::Wakeup(fd)
};
CHAN.get().unwrap().send(ev).unwrap();
}
pub struct File {
fd: RawFd,
_unsync_marker: UnsafeCell<()>,
}
impl File {
fn from_fd<FD: IntoRawFd>(fd: FD) -> io::Result<File> {
File::from_raw(fd.into_raw_fd())
}
fn from_raw(fd: RawFd) -> io::Result<File> {
macro_rules! fcntl {
($op:ident) => (fcntl!($op,));
($op:ident, $($args:expr),*) =>
({
trace!("fcntl({fd}, {})", stringify!($op));
c_err(unsafe { libc::fcntl(fd, { use libc::*; $op }, $($args),*) })
});
}
trace!("using fnctl on fd {fd}");
let flags = fcntl!(F_GETFL)?;
fcntl!(F_SETFL, flags | O_ASYNC | O_NONBLOCK)?;
let signo: c_int =
*SIGNAL.get().expect("not initialized, please call sigio::init()");
fcntl!(F_SETSIG, signo)?;
fcntl!(F_SETOWN, libc::getpid())?;
trace!("fnctl({fd}) successful");
Ok(File{ fd, _unsync_marker: UnsafeCell::new(()) })
}
pub fn from_std(file: std::fs::File) -> io::Result<File> {
File::from_raw(file.into_raw_fd())
}
pub fn open(filepath: impl AsRef<Path>) -> io::Result<File> {
Self::from_std(StdFile::open(filepath)?)
}
pub fn pipe() -> io::Result<(File, File)> {
let mut fds = [0 as c_int; 2];
c_err(unsafe { libc::pipe(fds.as_mut_ptr()) })?;
Ok((File::from_raw(fds[0])?, File::from_raw(fds[1])?))
}
fn raw_write(&self, buf: &[u8]) -> io::Result<isize> {
c_err(unsafe { libc::write(
self.fd, buf.as_ptr() as *mut c_void, buf.len())})
}
fn raw_read(&self, buf: &mut [u8]) -> io::Result<isize> {
c_err(unsafe { libc::read(
self.fd, buf.as_ptr() as *mut c_void, buf.len())})
}
}
impl Drop for File {
fn drop(&mut self) {
trace!("drop called on {}", self.fd);
if let Some(ch) = CHAN.get() {
let _ = ch.send(Event::Drop(self.fd));
}
}
}
fn poll_io<T, R, F>(fd: RawFd, ctx: &mut Context<'_>, res: io::Result<T>, conv: F)
-> Poll<io::Result<R>>
where F: FnOnce(T) -> R
{
match res {
Err(err) => if err.kind() == io::ErrorKind::WouldBlock {
trace!("got WouldBlock on fd {}, sending Queue event", fd);
CHAN.get().unwrap()
.send(Event::Queue(fd, ctx.waker().clone())).unwrap();
Poll::Pending
} else {
Poll::Ready(Err(err))
},
Ok(n) => {
Poll::Ready(Ok(conv(n)))
}
}
}
#[cfg(all(test, feature = "tokio"))]
mod tests {
use super::*;
use tokio::runtime;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::io::Write;
#[test]
fn it_works() {
init().unwrap();
let rt = runtime::Builder::new_current_thread().build().unwrap();
let mut out = String::new();
let mut devnull = File::open("/dev/null").unwrap();
let _guard = rt.enter();
rt.block_on(devnull.read_to_string(&mut out)).unwrap();
assert_eq!(out, "");
}
#[test_log::test]
fn pipe_mixed() {
use std::time::{Duration, Instant};
const SLEEP_TIME: Duration = Duration::from_secs(4);
init().unwrap();
log::info!("start");
let (o, i) = nix::unistd::pipe().unwrap();
log::info!("created pipe {o:?} {i:?}");
let mut infile: StdFile = i.into();
let mut o = File::from_fd(o).unwrap();
let rt = runtime::Builder::new_current_thread().enable_time().build().unwrap();
let out_task = rt.spawn(async move {
let mut out = String::new();
log::info!("reading from pipe...");
let read_start = Instant::now();
o.read_to_string(&mut out).await.unwrap();
let read_end = Instant::now();
log::info!("read from pipe.");
std::hint::black_box(o);
assert_eq!(out, "test123");
});
let _guard = rt.enter();
std::thread::yield_now();
std::thread::sleep(SLEEP_TIME);
log::info!("writing to pipe...");
infile.write_all(b"test123").unwrap();
log::info!("wrote to pipe.");
drop(infile);
rt.block_on(out_task).unwrap();
}
#[tokio::test]
async fn pipe_full_async() -> io::Result<()> {
crate::init()?;
let (mut rd, mut wr) = File::pipe().unwrap();
let mut buf = [0; 3];
let (nw, nr) = tokio::try_join!(
wr.write_all(b"abc"),
rd.read_exact(&mut buf),
)?;
assert_eq!(nr, 3);
assert_eq!(&buf, b"abc");
Ok(())
}
}