sigio 0.1.0

signal-based async io
Documentation
//! async io using SIGIO
//!
//! overriding the SIGIO handler will cause futures created by this crate to hang.
//! `File` implements traits from tokio, but it should work just as well with
//! other async runtimes.
//!
//! [only works on regular files on GNU systems](https://www.gnu.org/software/libc/manual/html_node/Asynchronous-I_002fO-Signals.html) (only tested on glibc linux)

use std::os::fd::{IntoRawFd, RawFd};
use std::io;
use std::fs::File as StdFile;
use std::path::Path;
use std::task::Waker;
use std::task::Poll;
use std::task::Context;
use std::sync::{OnceLock, atomic::{AtomicBool, Ordering}};
use std::sync::mpsc::{channel, Sender};
use std::cell::UnsafeCell;
use std::ffi::{c_void, c_int};
use std::pin::Pin;
use std::collections::HashMap;

use log::*;
use libc::{O_ASYNC, O_NONBLOCK};

mod compat;
mod convert;
mod gen;

use crate::gen::siginfo_t;
const F_SETSIG: c_int = crate::gen::F_SETSIG as c_int;

fn c_err<T: From<i8> + PartialEq>(retval: T) -> io::Result<T> {
	if retval == (-1 as i8).into() {
		let err = io::Error::last_os_error();
		//eprintln!("os error: {err}");
		Err(err)
	} else {
		Ok(retval)
	}
}

#[derive(Debug)]
enum Event {
	Wakeup(RawFd),
	// sometimes siginfo_t doesn't contain enough info to
	// identify the exact file.  in that case we resort to
	// waking up all of them.
	WakeAll,
	Queue(RawFd, Waker),
	Drop(RawFd),
}

static CHAN: OnceLock<Sender<Event>> = OnceLock::new();
static HANDLER_PID: OnceLock<libc::pid_t> = OnceLock::new();
static SIGNAL: OnceLock<c_int> = OnceLock::new();
static INIT_STARTED: AtomicBool = AtomicBool::new(false);
//static INIT_LOCK: 
/// initialize the signal handler for SIGIO
///
/// must be called before doing any file operations using this crate
// maybe could be run lazily as needed? would locking that behind a feature flag be too much?
pub fn init() -> io::Result<()> {
	// prevent double init, mostly for the benifit of testing
	
	if INIT_STARTED.swap(true, Ordering::Relaxed) {
		return Ok(());
	}
	let (snd, rcv) = channel();
	std::thread::spawn(move || {
		// will there ever be more than one Waker associated with an fd??
		// maybe if we implmented pread for thread sharing??
		let mut m: HashMap<RawFd, Vec<Waker>> = HashMap::new();
		while let Ok(ev) = rcv.recv() {
			trace!("got event {ev:?}");
			match ev {
				Event::Queue(fd, waker) => {
					// TODO: check if waker is a noop waker, and avoid adding it to the queue if it is?
					trace!("queuing waker on fd {fd}");
					let wakers = m.entry(fd).or_default();
					wakers.push(waker);
				}
				Event::WakeAll =>
					m.iter_mut()
					.for_each(|(_, v)| v.drain(..).for_each(Waker::wake)),
				Event::Wakeup(fd) | Event::Drop(fd) => {
					// SIGIO can be sent before any data is requested
					if let Some(wakers) = m.get_mut(&fd) {
						trace!("waking up {} futures on fd {fd}", wakers.len());
						wakers.drain(..).for_each(Waker::wake);
						if let Event::Drop(_) = ev {
							m.remove(&fd);
						}
					}
				}
			}
		}
		log::error!("sigio manager thread exited");
	});
	CHAN.set(snd).expect("init called more than once");
	unsafe {
		// TODO: error handling
		let signo = libc::SIGRTMIN() + 3;
		let mut sset = std::mem::zeroed();
		libc::sigfillset(&mut sset);
		trace!("installing signal handler for signal {signo}");
		let act = libc::sigaction{
			sa_flags: libc::SA_SIGINFO,
			sa_mask: sset.clone(),
			sa_sigaction: handler as extern "C" fn(_, _, _) as libc::sighandler_t,
			sa_restorer: None,
		};
		libc::sigaction(signo, &act, std::ptr::null_mut());
		// if the maximum number of realtime signals are queued,
		// linux will revert to sending SIGIO.
		libc::sigaction(libc::SIGIO, &act, std::ptr::null_mut());
		let pid = libc::getpid();
		//dbg!(pid);
		HANDLER_PID.set(pid).unwrap();
		SIGNAL.set(signo).unwrap();
	}

	Ok(())
}


// there's a lot of type punning going on here
extern fn handler(signal: c_int, info: &siginfo_t, _: *const ()) {
	// TODO: check si_code for POLL_IN
	let fd = info.si_fd();
	let code = info.si_code;
	log::trace!("got signal {signal} for fd {fd} (si_code {code})");
	let ev = if code == 0x80 {
		// got SI_KERNEL instead of POLL_IN,
		// si_fd does not contain info
		Event::WakeAll
	} else {
		Event::Wakeup(fd)
	};
	CHAN.get().unwrap().send(ev).unwrap();
}

/// a file managed using O_ASYNC
pub struct File {
	fd: RawFd,
	// tell rust not to let this be shared between threads.
	_unsync_marker: UnsafeCell<()>,
}

impl File {
	/// may cause unexpected results if fd has unusual flags set
	fn from_fd<FD: IntoRawFd>(fd: FD) -> io::Result<File> {
		File::from_raw(fd.into_raw_fd())
	}
	fn from_raw(fd: RawFd) -> io::Result<File> {
		macro_rules! fcntl {
			($op:ident) => (fcntl!($op,));
			($op:ident, $($args:expr),*) =>
				({
					trace!("fcntl({fd}, {})", stringify!($op));
					c_err(unsafe { libc::fcntl(fd, { use libc::*; $op }, $($args),*) })
				});
		}
		trace!("using fnctl on fd {fd}");
		let flags = fcntl!(F_GETFL)?;
		fcntl!(F_SETFL, flags | O_ASYNC | O_NONBLOCK)?;
		// make sure we pass the actual value, not a pointer.
		// variadics bypass typechecking entirly.
		let signo: c_int =
			*SIGNAL.get().expect("not initialized, please call sigio::init()");
		//trace!("signal: {signo}");
		fcntl!(F_SETSIG, signo)?;
		fcntl!(F_SETOWN, libc::getpid())?;
		trace!("fnctl({fd}) successful");
		Ok(File{ fd, _unsync_marker: UnsafeCell::new(()) })
	}

	pub fn from_std(file: std::fs::File) -> io::Result<File> {
		File::from_raw(file.into_raw_fd())
	}

	pub fn open(filepath: impl AsRef<Path>) -> io::Result<File> {
		Self::from_std(StdFile::open(filepath)?)
	}

	/// creates a linked (Read, Write) pair of files that lead to each other.
	pub fn pipe() -> io::Result<(File, File)> {
		let mut fds = [0 as c_int; 2];
		c_err(unsafe { libc::pipe(fds.as_mut_ptr()) })?;
		Ok((File::from_raw(fds[0])?, File::from_raw(fds[1])?))
	}

	/// perform a raw read syscall on the contained file descriptor
	fn raw_write(&self, buf: &[u8]) -> io::Result<isize> {
		c_err(unsafe { libc::write(
				self.fd, buf.as_ptr() as *mut c_void, buf.len())})
	}

	fn raw_read(&self, buf: &mut [u8]) -> io::Result<isize> {
		c_err(unsafe { libc::read(
			self.fd, buf.as_ptr() as *mut c_void, buf.len())})
	}
}

impl Drop for File {
	fn drop(&mut self) {
		trace!("drop called on {}", self.fd);
		//let _ = unsafe { libc::close(self.fd) };
		if let Some(ch) = CHAN.get() {
			let _ = ch.send(Event::Drop(self.fd));
		}
	}
}

fn poll_io<T, R, F>(fd: RawFd, ctx: &mut Context<'_>, res: io::Result<T>, conv: F)
					-> Poll<io::Result<R>>
	where F: FnOnce(T) -> R
{
	match res {
		Err(err) => if err.kind() == io::ErrorKind::WouldBlock {
			trace!("got WouldBlock on fd {}, sending Queue event", fd);
			CHAN.get().unwrap()
				.send(Event::Queue(fd, ctx.waker().clone())).unwrap();
			Poll::Pending
		} else {		
			Poll::Ready(Err(err))
		},
		Ok(n) => {
			Poll::Ready(Ok(conv(n)))
		}
	}
}



#[cfg(all(test, feature = "tokio"))]
mod tests {
    use super::*;
	use tokio::runtime;
	use tokio::io::{AsyncReadExt, AsyncWriteExt};
	use std::io::Write;
	//use test_log::test;

    #[test]
    fn it_works() {
		init().unwrap();
		let rt = runtime::Builder::new_current_thread().build().unwrap();
		let mut out = String::new();
		let mut devnull = File::open("/dev/null").unwrap();
		let _guard = rt.enter();
		rt.block_on(devnull.read_to_string(&mut out)).unwrap();
		assert_eq!(out, "");
    }

	#[test_log::test]
	fn pipe_mixed() {
		use std::time::{Duration, Instant};
		const SLEEP_TIME: Duration = Duration::from_secs(4);
		/*env_logger::builder()
			.parse_filters("sigio=trace")
			.parse_default_env()
			.format_timestamp_secs()
			.init();*/
		init().unwrap();
		log::info!("start");
		let (o, i) = nix::unistd::pipe().unwrap();
		log::info!("created pipe {o:?} {i:?}");
		let mut infile: StdFile = i.into();
		//let i = File::from_fd(i).unwrap();
		let mut o = File::from_fd(o).unwrap();
		let rt = runtime::Builder::new_current_thread().enable_time().build().unwrap();
		let out_task = rt.spawn(async move {
			let mut out = String::new();
			log::info!("reading from pipe...");
			let read_start = Instant::now();
			o.read_to_string(&mut out).await.unwrap();
			let read_end = Instant::now();
			log::info!("read from pipe.");
			//std::mem::drop(o);
			std::hint::black_box(o);
			assert_eq!(out, "test123");
			// make sure the thread yield actually works right.
			// without this, there's a chance the pipe could be written before
			// it is read, so the data would be available immediatly,
			// and the entire SIGIO code path would be bypassed entirly.
			// disabled for now.
			//let t = read_end.duration_since(read_start);
			//assert!(t > SLEEP_TIME,
			//		"only took {t:?} to read from pipe");
		});
		let _guard = rt.enter();
		// FIXME no guarentee the above task is running on a different thread.
		std::thread::yield_now();
		std::thread::sleep(SLEEP_TIME);
		log::info!("writing to pipe...");
		infile.write_all(b"test123").unwrap();
		log::info!("wrote to pipe.");
		drop(infile);
		rt.block_on(out_task).unwrap();
	}

	#[tokio::test]
	async fn pipe_full_async() -> io::Result<()> {
		crate::init()?;
		let (mut rd, mut wr) = File::pipe().unwrap();
		let mut buf = [0; 3];
		let (nw, nr) = tokio::try_join!(
			wr.write_all(b"abc"),
			rd.read_exact(&mut buf),
		)?;
		assert_eq!(nr, 3);
		assert_eq!(&buf, b"abc");
		Ok(())
	}
	// TODO: test what happens when you drop() one side of a sigio-backed pipe
}