erin 0.1.0

A very simple I/O reactor that allows creating green thread-like processes.
Documentation
//! Poll-based runtime. This is a single-threaded runtime using a `poll` loop.
//!
//! TODO(stevenroose) in theory we could have indentified wakers too and allow processes to
//! create new wakers and be notified of which waker was used

use std::{io, thread};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, SystemTime};

use crossbeam_channel as chan;
use log::*;
use popol;

use crate::{Process, Waker};
use crate::error::Error;
use crate::timeouts::TimeoutManager;

pub use popol::{Interest};

/// Maximum amount of time to wait for I/O.
const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);

/// Type to identify processes added to the runtime.
type ProcessId = usize;

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct IoToken(usize);

impl IoToken {
	/// A token that is guaranteed to never be used as a real token.
	pub const NULL: IoToken = IoToken(0);
}

impl Default for IoToken {
	fn default() -> IoToken { IoToken::NULL }
}

#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct TimerToken(usize);

impl TimerToken {
	/// A token that is guaranteed to never be used as a real token.
	pub const NULL: TimerToken = TimerToken(0);
}

impl Default for TimerToken {
	fn default() -> TimerToken { TimerToken::NULL }
}

/// Identical to [From] but only usable within the crate.
trait PrivateFrom<T> {
	fn from(_: T) -> Self;
}

impl PrivateFrom<usize> for IoToken {
	fn from(v: usize) -> IoToken { IoToken(v) }
}

impl PrivateFrom<usize> for TimerToken {
	fn from(v: usize) -> TimerToken { TimerToken(v) }
}

#[derive(Clone, Copy, Debug)]
pub struct IoEvent {
	pub token: IoToken,
	pub src: popol::Source,
}

#[derive(Clone, Copy, Debug)]
pub enum Event {
	Io(IoEvent),
	Timer(TimerToken),
	Waker,
}

impl Event {
	pub fn io(self) -> Option<IoEvent> {
		if let Event::Io(e) = self {
			Some(e)
		} else {
			None
		}
	}

	pub fn timer(self) -> Option<TimerToken> {
		if let Event::Timer(t) = self {
			Some(t)
		} else {
			None
		}
	}

	pub fn is_waker(self) -> bool {
		if let Event::Waker = self {
			true
		} else {
			false
		}
	}
}

/// Used as return type in various [Events] methods.
pub struct EventsIter<'a> {
	events: &'a Events<'a>,
	include_io: bool,
	include_timers: bool,
	idx: usize,
}

impl<'a> Iterator for EventsIter<'a> {
	type Item = Event;

	fn next(&mut self) -> Option<Self::Item> {
		loop {
			if self.idx < self.events.poll.len() {
				let pidx = self.idx;
				self.idx += 1;
				if !self.include_io {
					continue;
				}
				let poll = self.events.poll.get(pidx).unwrap();
				let ev = match poll.key {
					Source::RuntimeWaker => None,
					Source::ProcWaker { pid: s_pid } if s_pid == self.events.pid => {
						Some(Event::Waker)
					}
					Source::ProcWaker { .. } => None,
					Source::Io { pid: s_pid, token } if s_pid == self.events.pid => {
						Some(Event::Io(IoEvent { token, src: poll.source }))
					},
					Source::Io { .. } => None,
				};
				if let Some(ev) = ev {
					return Some(ev);
				} else {
					continue;
				}
			}
			if self.idx < self.events.poll.len() + self.events.timeouts.len() {
				let tidx = self.idx - self.events.poll.len();
				self.idx += 1;
				if !self.include_timers {
					continue;
				}
				let timeout = self.events.timeouts.get(tidx).unwrap();
				if timeout.pid == self.events.pid {
					return Some(Event::Timer(timeout.token));
				} else {
					continue;
				}
			}
			return None;
		}
	}
}

/// Get access to events relevant for the process.
#[derive(Clone)]
pub struct Events<'a> {
	pid: ProcessId,
	poll: &'a Vec<popol::Event<Source>>,
	timeouts: &'a Vec<TimerKey>,
}

impl<'a> Events<'a> {
	/// Iterate all events.
	pub fn iter(&'a self) -> EventsIter<'a> {
		EventsIter {
			events: self,
			include_io: true,
			include_timers: true,
			idx: 0,
		}
	}

	/// Get the number of events.
	///
	/// Note that this method has the same cost as iterating through
	/// [Events::iter].
	pub fn len(&self) -> usize {
		self.iter().count()
	}

	/// Iterate only I/O events.
	pub fn io(&'a self) -> impl Iterator<Item = IoEvent> + 'a {
		EventsIter {
			events: self,
			include_io: true,
			include_timers: false,
			idx: 0,
		}.map(|e| e.io().unwrap())
	}

	/// Iterate only timer events.
	pub fn timers(&'a self) -> impl Iterator<Item = TimerToken> + 'a {
		EventsIter {
			events: self,
			include_io: false,
			include_timers: true,
			idx: 0,
		}.map(|e| e.timer().unwrap())
	}

	/// Check if our waker has been called.
	pub fn waker(&self) -> bool {
		self.poll.iter().any(|e| match e.key {
			Source::ProcWaker { pid } => pid == self.pid,
			Source::RuntimeWaker => false,
			Source::Io { .. } => false
		})
	}
}

impl<'a, 's: 'a> IntoIterator for &'s Events<'a> {
	type Item = Event;
	type IntoIter = EventsIter<'a>;

	fn into_iter(self) -> Self::IntoIter {
		self.iter()
	}
}

/// Passed to a process on wakeup. Used to register and unregister new
/// I/O sources and timers.
pub struct RuntimeHandle<'a> {
	pid: ProcessId,

	// Main thread variables.
	sources: RefCell<&'a mut popol::Sources<Source>>,
	io_tokens: RefCell<&'a mut HashSet<IoToken>>,
	io_token_tally: RefCell<&'a mut TokenTally<IoToken>>,
	timeout_mgr: RefCell<&'a mut TimeoutManager<TimerKey>>,
	timer_token_tally: RefCell<&'a mut TokenTally<TimerToken>>,
	waker_src: Source,
}

/// The `R` parameter represents the underlying stream type, eg. `net::TcpStream`.
impl<'a> RuntimeHandle<'a> {
	/// Register a new I/O source.
	///
	/// To change the interest of a source, use [reregister_io] instead.
	pub fn register_io(&self, fd: &impl AsRawFd, events: Interest) -> IoToken {
		let token = self.io_token_tally.borrow_mut().next();
		let src = Source::Io { pid: self.pid, token };
		self.sources.borrow_mut().register(src.clone(), fd, events);
		self.io_tokens.borrow_mut().insert(token);
		token
	}

	/// Change the events you are interested in for this I/O source.
	pub fn reregister_io(&self, token: IoToken, events: Interest) {
		let src = Source::Io { pid: self.pid, token };
		self.sources.borrow_mut().set(&src, events);
	}

	/// Stop receiving eventsd for this I/O source.
	pub fn unregister_io(&self, token: IoToken) {
		let src = Source::Io { pid: self.pid, token };
		self.sources.borrow_mut().unregister(&src);
		self.io_tokens.borrow_mut().remove(&token);
	}

	/// Set a timer to wake you up at the given time.
	pub fn set_alarm(&self, time: SystemTime) -> TimerToken {
		let token = self.timer_token_tally.borrow_mut().next();
		let key = TimerKey {
			pid: self.pid,
			token: token,
		};
		self.timeout_mgr.borrow_mut().register(key, time);
		token
	}

	/// Set a timer to be woken up after the given duration.
	pub fn set_timer(&self, timer: Duration) -> TimerToken {
		self.set_alarm(SystemTime::now().checked_add(timer).expect("time overflow"))
	}

	/// Cancel previously set timer or alarm.
	pub fn cancel_timer(&self, token: TimerToken) {
		let key = TimerKey {
			pid: self.pid,
			token: token,
		};
		self.timeout_mgr.borrow_mut().unregister(key);
	}

	/// Cancel all previously set timers and alarms.
	pub fn cancel_all_timers(&self) {
		self.timeout_mgr.borrow_mut().retain_by_key(|k| k.pid != self.pid);
	}

	pub fn new_waker(&self) -> Waker {
		Waker::new(&mut *self.sources.borrow_mut(), self.waker_src)
			.expect("failed to create waker")
	}
}

/// A handle to a process running in an erin [Runtime].
#[derive(Clone)]
pub struct ProcessHandle {
	proc_waker: Waker,
	rt_waker: Waker,
	rt_ctrl_tx: chan::Sender<Ctrl>,
}

impl ProcessHandle {
	/// Wake up the process.
	pub fn wake(&self) -> Result<(), io::Error> {
		self.proc_waker.wake();
		Ok(())
	}

	/// Trigger shutdown of the process.
	pub fn shutdown(&self) -> Result<(), Error> {
		self.rt_ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
		self.rt_waker.wake();
		Ok(())
	}

	/// Create a new waker that can be used to wake up the process.
	///
	/// Note that you can use the [wake] method directly. This method
	/// allows you to create a more light-weight waker to pass around.
	///
	/// Use [into_waker] if you only want the waker and don't need the rest.
	pub fn clone_waker(&self) -> Waker {
		self.proc_waker.clone()
	}

	/// If you only need the waker part of the handle, you can use this
	/// method to drop the other parts.
	pub fn into_waker(self) -> Waker {
		self.proc_waker
	}
}

pub struct Runtime {
	waker: Waker,
	ctrl_tx: chan::Sender<Ctrl>,
	join_handle: thread::JoinHandle<()>,
}

impl Runtime {
	pub fn start() -> Result<Runtime, io::Error> {
		let (ctrl_tx, ctrl_rx) = chan::bounded(0);
		
		let mut sources = popol::Sources::new();
		let waker = Waker::new(&mut sources, Source::RuntimeWaker)?;
		let jh = thread::Builder::new()
			.name("erin_runtime".into())
			.spawn(|| {
				run(ctrl_rx, sources);
			})?;

		Ok(Runtime {
			waker: waker,
			ctrl_tx: ctrl_tx,
			join_handle: jh,
		})
	}

	/// Adds a new process to the runtime.
	///
	/// Returns a [ProcessHandle] that can be used to wake up and shut down the
	/// process.
	///
	/// NB This call depends on the runtime process waking up and responding to us.
	/// While this should generally not be a possibility, it is important that no
	/// calls to any of your [Process::wakeup] methods will be waiting on a lock
	/// that is held when calling this method.
	pub fn add_process(&self, process: Box<dyn Process>) -> Result<ProcessHandle, Error> {
		// This will be the response channel where we will wait for a waker on.
		let (waker_tx, waker_rx) = chan::bounded(1);
		self.ctrl_tx.send(Ctrl::NewProcess { process, waker_tx })
			.map_err(|_| Error::RuntimeProcessDied)?;

		// Wake up the runtime thread that will likely be in a blocking call.
		self.waker.wake();

		let proc_waker = waker_rx.recv()
			.map_err(|_| Error::RuntimeProcessDied)?
			.map_err(|()| Error::SetupFailed)?;
		Ok(ProcessHandle {
			proc_waker: proc_waker,
			rt_ctrl_tx: self.ctrl_tx.clone(),
			rt_waker: self.waker.clone(),
		})
	}

	/// Shut down the runtime.
	///
	/// This will call the [Process::shutdown] function on all processes
	/// before returning.
	pub fn shutdown(self) -> Result<(), Error> {
		self.ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
		self.waker.wake();
		self.join_handle.join().map_err(Error::Thread)?;
		Ok(())
	}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Source {
	RuntimeWaker,
	Io {
		pid: ProcessId,
		token: IoToken,
	},
	ProcWaker {
		pid: ProcessId,
	},
}

impl Source {
	fn pid(self) -> Option<ProcessId> {
		match self {
			Source::RuntimeWaker => None,
			Source::Io { pid, .. } => Some(pid),
			Source::ProcWaker { pid, .. } => Some(pid),
		}
	}
}

/// Used to safely create new enumerated tokens.
///
/// This implementation never emits a token with integer value 0.
struct TokenTally<T> {
	tally: usize,
	_pd: PhantomData<T>,
}

impl<T: PrivateFrom<usize>> TokenTally<T> {
	/// Create a new [TokenTally] that starts with value 1.
	fn new() -> TokenTally<T> {
		TokenTally {
			tally: 1,
			_pd: PhantomData,
		}
	}

	/// Get the next token to use.
	fn next(&mut self) -> T {
		let next = self.tally;
		self.tally += 1;
		assert_ne!(next, usize::max_value(), "token overflow");
		T::from(next)
	}
}

/// Key used for the timeout manager.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TimerKey {
	pid: ProcessId,
	token: TimerToken,
}

enum Ctrl {
	NewProcess {
		process: Box<dyn Process>,
		/// The channel where we are expected to reply with a waker
		/// for the new process.
		waker_tx: chan::Sender<Result<Waker, ()>>,
	},
	Shutdown,
}

/// The main run method for the runtime.
fn run(
	ctrl_rx: chan::Receiver<Ctrl>,
	sources: popol::Sources<Source>,
) {
	info!("Initializing service..");

	// Core variables.
	let mut sources = sources;
	let mut pid_tally = 0;
	let mut processes = HashMap::<ProcessId, (Box<dyn Process>, HashSet<IoToken>)>::new();
	let mut io_token_tally = TokenTally::new();
	let mut timeout_mgr = TimeoutManager::new();
	let mut timer_token_tally = TokenTally::new();

	// Buffers to keep single allocation.
	let mut poll_events = Vec::<popol::Event<Source>>::with_capacity(32);
	let mut timeouts = Vec::<TimerKey>::with_capacity(32);
	let mut dead_procs = Vec::new();

	loop {
		let timeout = timeout_mgr
			.next(SystemTime::now())
			.unwrap_or(WAIT_TIMEOUT)
			.into();

		trace!(
			"Polling {} source(s) and {} timeout(s), waking up in {:?}..",
			sources.len(), timeout_mgr.len(), timeout,
		);

		poll_events.clear();
		let ret = sources.wait_timeout(&mut poll_events, timeout); // Blocking.
		if let Err(err) = ret {
			if err.kind() != io::ErrorKind::TimedOut {
				error!("popol returned an error: {:?}", err);
				return;
			}
		}

		timeouts.clear();
		timeout_mgr.wake(SystemTime::now().into(), &mut timeouts);

		if poll_events.is_empty() && timeouts.is_empty() {
			continue;
		}

		if poll_events.iter().any(|e| e.key == Source::RuntimeWaker) {
			while let Ok(ctrl) = ctrl_rx.try_recv() {
				match ctrl {
					Ctrl::NewProcess { mut process, waker_tx } => {
						let pid = pid_tally;
						pid_tally += 1;

						// Call the setup method on the process.
						// Only really add the process if setup doesn't fail.
						let mut io_tokens = HashSet::new();
						let handle = RuntimeHandle {
							pid: pid,
							sources: RefCell::new(&mut sources),
							io_tokens: RefCell::new(&mut io_tokens),
							io_token_tally: RefCell::new(&mut io_token_tally),
							timeout_mgr: RefCell::new(&mut timeout_mgr),
							timer_token_tally: RefCell::new(&mut timer_token_tally),
							waker_src: Source::ProcWaker { pid },
						};
						let ret = if process.setup(&handle).is_ok() {
							processes.insert(pid, (process, io_tokens));
							let waker = Waker::new(&mut sources, Source::ProcWaker { pid })
								.expect("failed to create waker");
							Ok(waker)
						} else {
							error!("Setup method of new process errored. Not adding.");
							Err(())
						};
						if let Err(_) = waker_tx.send(ret) {
							error!(
								"User sent new process (pid {}) and hung up on response channel.",
								pid,
							);
						}
					}
					Ctrl::Shutdown => {
						info!("Shutdown signal received, shutting down processes...");
						for (pid, (proc, _)) in processes.iter_mut() {
							trace!("Shutting down process with pid {}", pid);
							proc.shutdown();
						}
						info!("Shutdown complete");
						return;
					}
				}
			}
		}

		trace!(
			"Woke up with {} I/O source(s) ready and {} timers expired",
			poll_events.len(), timeouts.len(),
		);

		for (pid, (proc, io_tokens)) in processes.iter_mut() {
			let has_poll = poll_events.iter().any(|e| e.key.pid() == Some(*pid));
			let has_timer = timeouts.iter().any(|t| t.pid == *pid);
			if !has_poll && !has_timer {
				continue;
			}
			let handle = RuntimeHandle {
				pid: *pid,
				sources: RefCell::new(&mut sources),
				io_tokens: RefCell::new(io_tokens),
				io_token_tally: RefCell::new(&mut io_token_tally),
				timeout_mgr: RefCell::new(&mut timeout_mgr),
				timer_token_tally: RefCell::new(&mut timer_token_tally),
				waker_src: Source::ProcWaker { pid: *pid },
			};
			let ev = Events {
				pid: *pid,
				poll: &poll_events,
				timeouts: &timeouts,
			};
			if proc.wakeup(&handle, ev).is_err() {
				dead_procs.push(*pid);
			}
		}
		for pid in dead_procs.drain(..) {
			let (_proc, io_tokens) = processes.remove(&pid).unwrap();
			// Unregister all the poll tokens it has.
			for token in io_tokens {
				sources.unregister(&Source::Io { pid, token });
			}
			// Unregister the waker.
			sources.unregister(&Source::ProcWaker { pid });
			// Remove all timers.
			timeout_mgr.retain_by_key(|k| k.pid != pid);
		}
	}
}