use std::{io, thread};
use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::os::unix::io::AsRawFd;
use std::time::{Duration, SystemTime};
use crossbeam_channel as chan;
use log::*;
use popol;
use crate::{Process, Waker};
use crate::error::Error;
use crate::timeouts::TimeoutManager;
pub use popol::{Interest};
const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
type ProcessId = usize;
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct IoToken(usize);
impl IoToken {
pub const NULL: IoToken = IoToken(0);
}
impl Default for IoToken {
fn default() -> IoToken { IoToken::NULL }
}
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct TimerToken(usize);
impl TimerToken {
pub const NULL: TimerToken = TimerToken(0);
}
impl Default for TimerToken {
fn default() -> TimerToken { TimerToken::NULL }
}
trait PrivateFrom<T> {
fn from(_: T) -> Self;
}
impl PrivateFrom<usize> for IoToken {
fn from(v: usize) -> IoToken { IoToken(v) }
}
impl PrivateFrom<usize> for TimerToken {
fn from(v: usize) -> TimerToken { TimerToken(v) }
}
#[derive(Clone, Copy, Debug)]
pub struct IoEvent {
pub token: IoToken,
pub src: popol::Source,
}
#[derive(Clone, Copy, Debug)]
pub enum Event {
Io(IoEvent),
Timer(TimerToken),
Waker,
}
impl Event {
pub fn io(self) -> Option<IoEvent> {
if let Event::Io(e) = self {
Some(e)
} else {
None
}
}
pub fn timer(self) -> Option<TimerToken> {
if let Event::Timer(t) = self {
Some(t)
} else {
None
}
}
pub fn is_waker(self) -> bool {
if let Event::Waker = self {
true
} else {
false
}
}
}
pub struct EventsIter<'a> {
events: &'a Events<'a>,
include_io: bool,
include_timers: bool,
idx: usize,
}
impl<'a> Iterator for EventsIter<'a> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.idx < self.events.poll.len() {
let pidx = self.idx;
self.idx += 1;
if !self.include_io {
continue;
}
let poll = self.events.poll.get(pidx).unwrap();
let ev = match poll.key {
Source::RuntimeWaker => None,
Source::ProcWaker { pid: s_pid } if s_pid == self.events.pid => {
Some(Event::Waker)
}
Source::ProcWaker { .. } => None,
Source::Io { pid: s_pid, token } if s_pid == self.events.pid => {
Some(Event::Io(IoEvent { token, src: poll.source }))
},
Source::Io { .. } => None,
};
if let Some(ev) = ev {
return Some(ev);
} else {
continue;
}
}
if self.idx < self.events.poll.len() + self.events.timeouts.len() {
let tidx = self.idx - self.events.poll.len();
self.idx += 1;
if !self.include_timers {
continue;
}
let timeout = self.events.timeouts.get(tidx).unwrap();
if timeout.pid == self.events.pid {
return Some(Event::Timer(timeout.token));
} else {
continue;
}
}
return None;
}
}
}
#[derive(Clone)]
pub struct Events<'a> {
pid: ProcessId,
poll: &'a Vec<popol::Event<Source>>,
timeouts: &'a Vec<TimerKey>,
}
impl<'a> Events<'a> {
pub fn iter(&'a self) -> EventsIter<'a> {
EventsIter {
events: self,
include_io: true,
include_timers: true,
idx: 0,
}
}
pub fn len(&self) -> usize {
self.iter().count()
}
pub fn io(&'a self) -> impl Iterator<Item = IoEvent> + 'a {
EventsIter {
events: self,
include_io: true,
include_timers: false,
idx: 0,
}.map(|e| e.io().unwrap())
}
pub fn timers(&'a self) -> impl Iterator<Item = TimerToken> + 'a {
EventsIter {
events: self,
include_io: false,
include_timers: true,
idx: 0,
}.map(|e| e.timer().unwrap())
}
pub fn waker(&self) -> bool {
self.poll.iter().any(|e| match e.key {
Source::ProcWaker { pid } => pid == self.pid,
Source::RuntimeWaker => false,
Source::Io { .. } => false
})
}
}
impl<'a, 's: 'a> IntoIterator for &'s Events<'a> {
type Item = Event;
type IntoIter = EventsIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
pub struct RuntimeHandle<'a> {
pid: ProcessId,
sources: RefCell<&'a mut popol::Sources<Source>>,
io_tokens: RefCell<&'a mut HashSet<IoToken>>,
io_token_tally: RefCell<&'a mut TokenTally<IoToken>>,
timeout_mgr: RefCell<&'a mut TimeoutManager<TimerKey>>,
timer_token_tally: RefCell<&'a mut TokenTally<TimerToken>>,
waker_src: Source,
}
impl<'a> RuntimeHandle<'a> {
pub fn register_io(&self, fd: &impl AsRawFd, events: Interest) -> IoToken {
let token = self.io_token_tally.borrow_mut().next();
let src = Source::Io { pid: self.pid, token };
self.sources.borrow_mut().register(src.clone(), fd, events);
self.io_tokens.borrow_mut().insert(token);
token
}
pub fn reregister_io(&self, token: IoToken, events: Interest) {
let src = Source::Io { pid: self.pid, token };
self.sources.borrow_mut().set(&src, events);
}
pub fn unregister_io(&self, token: IoToken) {
let src = Source::Io { pid: self.pid, token };
self.sources.borrow_mut().unregister(&src);
self.io_tokens.borrow_mut().remove(&token);
}
pub fn set_alarm(&self, time: SystemTime) -> TimerToken {
let token = self.timer_token_tally.borrow_mut().next();
let key = TimerKey {
pid: self.pid,
token: token,
};
self.timeout_mgr.borrow_mut().register(key, time);
token
}
pub fn set_timer(&self, timer: Duration) -> TimerToken {
self.set_alarm(SystemTime::now().checked_add(timer).expect("time overflow"))
}
pub fn cancel_timer(&self, token: TimerToken) {
let key = TimerKey {
pid: self.pid,
token: token,
};
self.timeout_mgr.borrow_mut().unregister(key);
}
pub fn cancel_all_timers(&self) {
self.timeout_mgr.borrow_mut().retain_by_key(|k| k.pid != self.pid);
}
pub fn new_waker(&self) -> Waker {
Waker::new(&mut *self.sources.borrow_mut(), self.waker_src)
.expect("failed to create waker")
}
}
#[derive(Clone)]
pub struct ProcessHandle {
proc_waker: Waker,
rt_waker: Waker,
rt_ctrl_tx: chan::Sender<Ctrl>,
}
impl ProcessHandle {
pub fn wake(&self) -> Result<(), io::Error> {
self.proc_waker.wake();
Ok(())
}
pub fn shutdown(&self) -> Result<(), Error> {
self.rt_ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
self.rt_waker.wake();
Ok(())
}
pub fn clone_waker(&self) -> Waker {
self.proc_waker.clone()
}
pub fn into_waker(self) -> Waker {
self.proc_waker
}
}
pub struct Runtime {
waker: Waker,
ctrl_tx: chan::Sender<Ctrl>,
join_handle: thread::JoinHandle<()>,
}
impl Runtime {
pub fn start() -> Result<Runtime, io::Error> {
let (ctrl_tx, ctrl_rx) = chan::bounded(0);
let mut sources = popol::Sources::new();
let waker = Waker::new(&mut sources, Source::RuntimeWaker)?;
let jh = thread::Builder::new()
.name("erin_runtime".into())
.spawn(|| {
run(ctrl_rx, sources);
})?;
Ok(Runtime {
waker: waker,
ctrl_tx: ctrl_tx,
join_handle: jh,
})
}
pub fn add_process(&self, process: Box<dyn Process>) -> Result<ProcessHandle, Error> {
let (waker_tx, waker_rx) = chan::bounded(1);
self.ctrl_tx.send(Ctrl::NewProcess { process, waker_tx })
.map_err(|_| Error::RuntimeProcessDied)?;
self.waker.wake();
let proc_waker = waker_rx.recv()
.map_err(|_| Error::RuntimeProcessDied)?
.map_err(|()| Error::SetupFailed)?;
Ok(ProcessHandle {
proc_waker: proc_waker,
rt_ctrl_tx: self.ctrl_tx.clone(),
rt_waker: self.waker.clone(),
})
}
pub fn shutdown(self) -> Result<(), Error> {
self.ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
self.waker.wake();
self.join_handle.join().map_err(Error::Thread)?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum Source {
RuntimeWaker,
Io {
pid: ProcessId,
token: IoToken,
},
ProcWaker {
pid: ProcessId,
},
}
impl Source {
fn pid(self) -> Option<ProcessId> {
match self {
Source::RuntimeWaker => None,
Source::Io { pid, .. } => Some(pid),
Source::ProcWaker { pid, .. } => Some(pid),
}
}
}
struct TokenTally<T> {
tally: usize,
_pd: PhantomData<T>,
}
impl<T: PrivateFrom<usize>> TokenTally<T> {
fn new() -> TokenTally<T> {
TokenTally {
tally: 1,
_pd: PhantomData,
}
}
fn next(&mut self) -> T {
let next = self.tally;
self.tally += 1;
assert_ne!(next, usize::max_value(), "token overflow");
T::from(next)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct TimerKey {
pid: ProcessId,
token: TimerToken,
}
enum Ctrl {
NewProcess {
process: Box<dyn Process>,
waker_tx: chan::Sender<Result<Waker, ()>>,
},
Shutdown,
}
fn run(
ctrl_rx: chan::Receiver<Ctrl>,
sources: popol::Sources<Source>,
) {
info!("Initializing service..");
let mut sources = sources;
let mut pid_tally = 0;
let mut processes = HashMap::<ProcessId, (Box<dyn Process>, HashSet<IoToken>)>::new();
let mut io_token_tally = TokenTally::new();
let mut timeout_mgr = TimeoutManager::new();
let mut timer_token_tally = TokenTally::new();
let mut poll_events = Vec::<popol::Event<Source>>::with_capacity(32);
let mut timeouts = Vec::<TimerKey>::with_capacity(32);
let mut dead_procs = Vec::new();
loop {
let timeout = timeout_mgr
.next(SystemTime::now())
.unwrap_or(WAIT_TIMEOUT)
.into();
trace!(
"Polling {} source(s) and {} timeout(s), waking up in {:?}..",
sources.len(), timeout_mgr.len(), timeout,
);
poll_events.clear();
let ret = sources.wait_timeout(&mut poll_events, timeout); if let Err(err) = ret {
if err.kind() != io::ErrorKind::TimedOut {
error!("popol returned an error: {:?}", err);
return;
}
}
timeouts.clear();
timeout_mgr.wake(SystemTime::now().into(), &mut timeouts);
if poll_events.is_empty() && timeouts.is_empty() {
continue;
}
if poll_events.iter().any(|e| e.key == Source::RuntimeWaker) {
while let Ok(ctrl) = ctrl_rx.try_recv() {
match ctrl {
Ctrl::NewProcess { mut process, waker_tx } => {
let pid = pid_tally;
pid_tally += 1;
let mut io_tokens = HashSet::new();
let handle = RuntimeHandle {
pid: pid,
sources: RefCell::new(&mut sources),
io_tokens: RefCell::new(&mut io_tokens),
io_token_tally: RefCell::new(&mut io_token_tally),
timeout_mgr: RefCell::new(&mut timeout_mgr),
timer_token_tally: RefCell::new(&mut timer_token_tally),
waker_src: Source::ProcWaker { pid },
};
let ret = if process.setup(&handle).is_ok() {
processes.insert(pid, (process, io_tokens));
let waker = Waker::new(&mut sources, Source::ProcWaker { pid })
.expect("failed to create waker");
Ok(waker)
} else {
error!("Setup method of new process errored. Not adding.");
Err(())
};
if let Err(_) = waker_tx.send(ret) {
error!(
"User sent new process (pid {}) and hung up on response channel.",
pid,
);
}
}
Ctrl::Shutdown => {
info!("Shutdown signal received, shutting down processes...");
for (pid, (proc, _)) in processes.iter_mut() {
trace!("Shutting down process with pid {}", pid);
proc.shutdown();
}
info!("Shutdown complete");
return;
}
}
}
}
trace!(
"Woke up with {} I/O source(s) ready and {} timers expired",
poll_events.len(), timeouts.len(),
);
for (pid, (proc, io_tokens)) in processes.iter_mut() {
let has_poll = poll_events.iter().any(|e| e.key.pid() == Some(*pid));
let has_timer = timeouts.iter().any(|t| t.pid == *pid);
if !has_poll && !has_timer {
continue;
}
let handle = RuntimeHandle {
pid: *pid,
sources: RefCell::new(&mut sources),
io_tokens: RefCell::new(io_tokens),
io_token_tally: RefCell::new(&mut io_token_tally),
timeout_mgr: RefCell::new(&mut timeout_mgr),
timer_token_tally: RefCell::new(&mut timer_token_tally),
waker_src: Source::ProcWaker { pid: *pid },
};
let ev = Events {
pid: *pid,
poll: &poll_events,
timeouts: &timeouts,
};
if proc.wakeup(&handle, ev).is_err() {
dead_procs.push(*pid);
}
}
for pid in dead_procs.drain(..) {
let (_proc, io_tokens) = processes.remove(&pid).unwrap();
for token in io_tokens {
sources.unregister(&Source::Io { pid, token });
}
sources.unregister(&Source::ProcWaker { pid });
timeout_mgr.retain_by_key(|k| k.pid != pid);
}
}
}