1use std::{io, thread};
7use std::cell::RefCell;
8use std::collections::{HashMap, HashSet};
9use std::marker::PhantomData;
10use std::os::unix::io::AsRawFd;
11use std::time::{Duration, SystemTime};
12
13use crossbeam_channel as chan;
14use log::*;
15use popol;
16
17use crate::{Process, Waker};
18use crate::error::Error;
19use crate::timeouts::TimeoutManager;
20
21pub use popol::{Interest};
22
23const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
25
26type ProcessId = usize;
28
29#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct IoToken(usize);
31
32impl IoToken {
33 pub const NULL: IoToken = IoToken(0);
35}
36
37impl Default for IoToken {
38 fn default() -> IoToken { IoToken::NULL }
39}
40
41#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
42pub struct TimerToken(usize);
43
44impl TimerToken {
45 pub const NULL: TimerToken = TimerToken(0);
47}
48
49impl Default for TimerToken {
50 fn default() -> TimerToken { TimerToken::NULL }
51}
52
53trait PrivateFrom<T> {
55 fn from(_: T) -> Self;
56}
57
58impl PrivateFrom<usize> for IoToken {
59 fn from(v: usize) -> IoToken { IoToken(v) }
60}
61
62impl PrivateFrom<usize> for TimerToken {
63 fn from(v: usize) -> TimerToken { TimerToken(v) }
64}
65
66#[derive(Clone, Copy, Debug)]
67pub struct IoEvent {
68 pub token: IoToken,
69 pub src: popol::Source,
70}
71
72#[derive(Clone, Copy, Debug)]
73pub enum Event {
74 Io(IoEvent),
75 Timer(TimerToken),
76 Waker,
77}
78
79impl Event {
80 pub fn io(self) -> Option<IoEvent> {
81 if let Event::Io(e) = self {
82 Some(e)
83 } else {
84 None
85 }
86 }
87
88 pub fn timer(self) -> Option<TimerToken> {
89 if let Event::Timer(t) = self {
90 Some(t)
91 } else {
92 None
93 }
94 }
95
96 pub fn is_waker(self) -> bool {
97 if let Event::Waker = self {
98 true
99 } else {
100 false
101 }
102 }
103}
104
105pub struct EventsIter<'a> {
107 events: &'a Events<'a>,
108 include_io: bool,
109 include_timers: bool,
110 idx: usize,
111}
112
113impl<'a> Iterator for EventsIter<'a> {
114 type Item = Event;
115
116 fn next(&mut self) -> Option<Self::Item> {
117 loop {
118 if self.idx < self.events.poll.len() {
119 let pidx = self.idx;
120 self.idx += 1;
121 if !self.include_io {
122 continue;
123 }
124 let poll = self.events.poll.get(pidx).unwrap();
125 let ev = match poll.key {
126 Source::RuntimeWaker => None,
127 Source::ProcWaker { pid: s_pid } if s_pid == self.events.pid => {
128 Some(Event::Waker)
129 }
130 Source::ProcWaker { .. } => None,
131 Source::Io { pid: s_pid, token } if s_pid == self.events.pid => {
132 Some(Event::Io(IoEvent { token, src: poll.source }))
133 },
134 Source::Io { .. } => None,
135 };
136 if let Some(ev) = ev {
137 return Some(ev);
138 } else {
139 continue;
140 }
141 }
142 if self.idx < self.events.poll.len() + self.events.timeouts.len() {
143 let tidx = self.idx - self.events.poll.len();
144 self.idx += 1;
145 if !self.include_timers {
146 continue;
147 }
148 let timeout = self.events.timeouts.get(tidx).unwrap();
149 if timeout.pid == self.events.pid {
150 return Some(Event::Timer(timeout.token));
151 } else {
152 continue;
153 }
154 }
155 return None;
156 }
157 }
158}
159
160#[derive(Clone)]
162pub struct Events<'a> {
163 pid: ProcessId,
164 poll: &'a Vec<popol::Event<Source>>,
165 timeouts: &'a Vec<TimerKey>,
166}
167
168impl<'a> Events<'a> {
169 pub fn iter(&'a self) -> EventsIter<'a> {
171 EventsIter {
172 events: self,
173 include_io: true,
174 include_timers: true,
175 idx: 0,
176 }
177 }
178
179 pub fn len(&self) -> usize {
184 self.iter().count()
185 }
186
187 pub fn io(&'a self) -> impl Iterator<Item = IoEvent> + 'a {
189 EventsIter {
190 events: self,
191 include_io: true,
192 include_timers: false,
193 idx: 0,
194 }.map(|e| e.io().unwrap())
195 }
196
197 pub fn timers(&'a self) -> impl Iterator<Item = TimerToken> + 'a {
199 EventsIter {
200 events: self,
201 include_io: false,
202 include_timers: true,
203 idx: 0,
204 }.map(|e| e.timer().unwrap())
205 }
206
207 pub fn waker(&self) -> bool {
209 self.poll.iter().any(|e| match e.key {
210 Source::ProcWaker { pid } => pid == self.pid,
211 Source::RuntimeWaker => false,
212 Source::Io { .. } => false
213 })
214 }
215}
216
217impl<'a, 's: 'a> IntoIterator for &'s Events<'a> {
218 type Item = Event;
219 type IntoIter = EventsIter<'a>;
220
221 fn into_iter(self) -> Self::IntoIter {
222 self.iter()
223 }
224}
225
226pub struct RuntimeHandle<'a> {
229 pid: ProcessId,
230
231 sources: RefCell<&'a mut popol::Sources<Source>>,
233 io_tokens: RefCell<&'a mut HashSet<IoToken>>,
234 io_token_tally: RefCell<&'a mut TokenTally<IoToken>>,
235 timeout_mgr: RefCell<&'a mut TimeoutManager<TimerKey>>,
236 timer_token_tally: RefCell<&'a mut TokenTally<TimerToken>>,
237 waker_src: Source,
238}
239
240impl<'a> RuntimeHandle<'a> {
242 pub fn register_io(&self, fd: &impl AsRawFd, events: Interest) -> IoToken {
246 let token = self.io_token_tally.borrow_mut().next();
247 let src = Source::Io { pid: self.pid, token };
248 self.sources.borrow_mut().register(src.clone(), fd, events);
249 self.io_tokens.borrow_mut().insert(token);
250 token
251 }
252
253 pub fn reregister_io(&self, token: IoToken, events: Interest) {
255 let src = Source::Io { pid: self.pid, token };
256 self.sources.borrow_mut().set(&src, events);
257 }
258
259 pub fn unregister_io(&self, token: IoToken) {
261 let src = Source::Io { pid: self.pid, token };
262 self.sources.borrow_mut().unregister(&src);
263 self.io_tokens.borrow_mut().remove(&token);
264 }
265
266 pub fn set_alarm(&self, time: SystemTime) -> TimerToken {
268 let token = self.timer_token_tally.borrow_mut().next();
269 let key = TimerKey {
270 pid: self.pid,
271 token: token,
272 };
273 self.timeout_mgr.borrow_mut().register(key, time);
274 token
275 }
276
277 pub fn set_timer(&self, timer: Duration) -> TimerToken {
279 self.set_alarm(SystemTime::now().checked_add(timer).expect("time overflow"))
280 }
281
282 pub fn cancel_timer(&self, token: TimerToken) {
284 let key = TimerKey {
285 pid: self.pid,
286 token: token,
287 };
288 self.timeout_mgr.borrow_mut().unregister(key);
289 }
290
291 pub fn cancel_all_timers(&self) {
293 self.timeout_mgr.borrow_mut().retain_by_key(|k| k.pid != self.pid);
294 }
295
296 pub fn new_waker(&self) -> Waker {
297 Waker::new(&mut *self.sources.borrow_mut(), self.waker_src)
298 .expect("failed to create waker")
299 }
300}
301
302#[derive(Clone)]
304pub struct ProcessHandle {
305 proc_waker: Waker,
306 rt_waker: Waker,
307 rt_ctrl_tx: chan::Sender<Ctrl>,
308}
309
310impl ProcessHandle {
311 pub fn wake(&self) -> Result<(), io::Error> {
313 self.proc_waker.wake();
314 Ok(())
315 }
316
317 pub fn shutdown(&self) -> Result<(), Error> {
319 self.rt_ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
320 self.rt_waker.wake();
321 Ok(())
322 }
323
324 pub fn clone_waker(&self) -> Waker {
331 self.proc_waker.clone()
332 }
333
334 pub fn into_waker(self) -> Waker {
337 self.proc_waker
338 }
339}
340
341pub struct Runtime {
342 waker: Waker,
343 ctrl_tx: chan::Sender<Ctrl>,
344 join_handle: thread::JoinHandle<()>,
345}
346
347impl Runtime {
348 pub fn start() -> Result<Runtime, io::Error> {
349 let (ctrl_tx, ctrl_rx) = chan::bounded(0);
350
351 let mut sources = popol::Sources::new();
352 let waker = Waker::new(&mut sources, Source::RuntimeWaker)?;
353 let jh = thread::Builder::new()
354 .name("erin_runtime".into())
355 .spawn(|| {
356 run(ctrl_rx, sources);
357 })?;
358
359 Ok(Runtime {
360 waker: waker,
361 ctrl_tx: ctrl_tx,
362 join_handle: jh,
363 })
364 }
365
366 pub fn add_process(&self, process: Box<dyn Process>) -> Result<ProcessHandle, Error> {
376 let (waker_tx, waker_rx) = chan::bounded(1);
378 self.ctrl_tx.send(Ctrl::NewProcess { process, waker_tx })
379 .map_err(|_| Error::RuntimeProcessDied)?;
380
381 self.waker.wake();
383
384 let proc_waker = waker_rx.recv()
385 .map_err(|_| Error::RuntimeProcessDied)?
386 .map_err(|()| Error::SetupFailed)?;
387 Ok(ProcessHandle {
388 proc_waker: proc_waker,
389 rt_ctrl_tx: self.ctrl_tx.clone(),
390 rt_waker: self.waker.clone(),
391 })
392 }
393
394 pub fn shutdown(self) -> Result<(), Error> {
399 self.ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
400 self.waker.wake();
401 self.join_handle.join().map_err(Error::Thread)?;
402 Ok(())
403 }
404}
405
406#[derive(Debug, PartialEq, Eq, Clone, Copy)]
407enum Source {
408 RuntimeWaker,
409 Io {
410 pid: ProcessId,
411 token: IoToken,
412 },
413 ProcWaker {
414 pid: ProcessId,
415 },
416}
417
418impl Source {
419 fn pid(self) -> Option<ProcessId> {
420 match self {
421 Source::RuntimeWaker => None,
422 Source::Io { pid, .. } => Some(pid),
423 Source::ProcWaker { pid, .. } => Some(pid),
424 }
425 }
426}
427
428struct TokenTally<T> {
432 tally: usize,
433 _pd: PhantomData<T>,
434}
435
436impl<T: PrivateFrom<usize>> TokenTally<T> {
437 fn new() -> TokenTally<T> {
439 TokenTally {
440 tally: 1,
441 _pd: PhantomData,
442 }
443 }
444
445 fn next(&mut self) -> T {
447 let next = self.tally;
448 self.tally += 1;
449 assert_ne!(next, usize::max_value(), "token overflow");
450 T::from(next)
451 }
452}
453
454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456struct TimerKey {
457 pid: ProcessId,
458 token: TimerToken,
459}
460
461enum Ctrl {
462 NewProcess {
463 process: Box<dyn Process>,
464 waker_tx: chan::Sender<Result<Waker, ()>>,
467 },
468 Shutdown,
469}
470
471fn run(
473 ctrl_rx: chan::Receiver<Ctrl>,
474 sources: popol::Sources<Source>,
475) {
476 info!("Initializing service..");
477
478 let mut sources = sources;
480 let mut pid_tally = 0;
481 let mut processes = HashMap::<ProcessId, (Box<dyn Process>, HashSet<IoToken>)>::new();
482 let mut io_token_tally = TokenTally::new();
483 let mut timeout_mgr = TimeoutManager::new();
484 let mut timer_token_tally = TokenTally::new();
485
486 let mut poll_events = Vec::<popol::Event<Source>>::with_capacity(32);
488 let mut timeouts = Vec::<TimerKey>::with_capacity(32);
489 let mut dead_procs = Vec::new();
490
491 loop {
492 let timeout = timeout_mgr
493 .next(SystemTime::now())
494 .unwrap_or(WAIT_TIMEOUT)
495 .into();
496
497 trace!(
498 "Polling {} source(s) and {} timeout(s), waking up in {:?}..",
499 sources.len(), timeout_mgr.len(), timeout,
500 );
501
502 poll_events.clear();
503 let ret = sources.wait_timeout(&mut poll_events, timeout); if let Err(err) = ret {
505 if err.kind() != io::ErrorKind::TimedOut {
506 error!("popol returned an error: {:?}", err);
507 return;
508 }
509 }
510
511 timeouts.clear();
512 timeout_mgr.wake(SystemTime::now().into(), &mut timeouts);
513
514 if poll_events.is_empty() && timeouts.is_empty() {
515 continue;
516 }
517
518 if poll_events.iter().any(|e| e.key == Source::RuntimeWaker) {
519 while let Ok(ctrl) = ctrl_rx.try_recv() {
520 match ctrl {
521 Ctrl::NewProcess { mut process, waker_tx } => {
522 let pid = pid_tally;
523 pid_tally += 1;
524
525 let mut io_tokens = HashSet::new();
528 let handle = RuntimeHandle {
529 pid: pid,
530 sources: RefCell::new(&mut sources),
531 io_tokens: RefCell::new(&mut io_tokens),
532 io_token_tally: RefCell::new(&mut io_token_tally),
533 timeout_mgr: RefCell::new(&mut timeout_mgr),
534 timer_token_tally: RefCell::new(&mut timer_token_tally),
535 waker_src: Source::ProcWaker { pid },
536 };
537 let ret = if process.setup(&handle).is_ok() {
538 processes.insert(pid, (process, io_tokens));
539 let waker = Waker::new(&mut sources, Source::ProcWaker { pid })
540 .expect("failed to create waker");
541 Ok(waker)
542 } else {
543 error!("Setup method of new process errored. Not adding.");
544 Err(())
545 };
546 if let Err(_) = waker_tx.send(ret) {
547 error!(
548 "User sent new process (pid {}) and hung up on response channel.",
549 pid,
550 );
551 }
552 }
553 Ctrl::Shutdown => {
554 info!("Shutdown signal received, shutting down processes...");
555 for (pid, (proc, _)) in processes.iter_mut() {
556 trace!("Shutting down process with pid {}", pid);
557 proc.shutdown();
558 }
559 info!("Shutdown complete");
560 return;
561 }
562 }
563 }
564 }
565
566 trace!(
567 "Woke up with {} I/O source(s) ready and {} timers expired",
568 poll_events.len(), timeouts.len(),
569 );
570
571 for (pid, (proc, io_tokens)) in processes.iter_mut() {
572 let has_poll = poll_events.iter().any(|e| e.key.pid() == Some(*pid));
573 let has_timer = timeouts.iter().any(|t| t.pid == *pid);
574 if !has_poll && !has_timer {
575 continue;
576 }
577 let handle = RuntimeHandle {
578 pid: *pid,
579 sources: RefCell::new(&mut sources),
580 io_tokens: RefCell::new(io_tokens),
581 io_token_tally: RefCell::new(&mut io_token_tally),
582 timeout_mgr: RefCell::new(&mut timeout_mgr),
583 timer_token_tally: RefCell::new(&mut timer_token_tally),
584 waker_src: Source::ProcWaker { pid: *pid },
585 };
586 let ev = Events {
587 pid: *pid,
588 poll: &poll_events,
589 timeouts: &timeouts,
590 };
591 if proc.wakeup(&handle, ev).is_err() {
592 dead_procs.push(*pid);
593 }
594 }
595 for pid in dead_procs.drain(..) {
596 let (_proc, io_tokens) = processes.remove(&pid).unwrap();
597 for token in io_tokens {
599 sources.unregister(&Source::Io { pid, token });
600 }
601 sources.unregister(&Source::ProcWaker { pid });
603 timeout_mgr.retain_by_key(|k| k.pid != pid);
605 }
606 }
607}