erin/
runtime.rs

1//! Poll-based runtime. This is a single-threaded runtime using a `poll` loop.
2//!
3//! TODO(stevenroose) in theory we could have indentified wakers too and allow processes to
4//! create new wakers and be notified of which waker was used
5
6use std::{io, thread};
7use std::cell::RefCell;
8use std::collections::{HashMap, HashSet};
9use std::marker::PhantomData;
10use std::os::unix::io::AsRawFd;
11use std::time::{Duration, SystemTime};
12
13use crossbeam_channel as chan;
14use log::*;
15use popol;
16
17use crate::{Process, Waker};
18use crate::error::Error;
19use crate::timeouts::TimeoutManager;
20
21pub use popol::{Interest};
22
23/// Maximum amount of time to wait for I/O.
24const WAIT_TIMEOUT: Duration = Duration::from_secs(60 * 60);
25
26/// Type to identify processes added to the runtime.
27type ProcessId = usize;
28
29#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
30pub struct IoToken(usize);
31
32impl IoToken {
33	/// A token that is guaranteed to never be used as a real token.
34	pub const NULL: IoToken = IoToken(0);
35}
36
37impl Default for IoToken {
38	fn default() -> IoToken { IoToken::NULL }
39}
40
41#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
42pub struct TimerToken(usize);
43
44impl TimerToken {
45	/// A token that is guaranteed to never be used as a real token.
46	pub const NULL: TimerToken = TimerToken(0);
47}
48
49impl Default for TimerToken {
50	fn default() -> TimerToken { TimerToken::NULL }
51}
52
53/// Identical to [From] but only usable within the crate.
54trait PrivateFrom<T> {
55	fn from(_: T) -> Self;
56}
57
58impl PrivateFrom<usize> for IoToken {
59	fn from(v: usize) -> IoToken { IoToken(v) }
60}
61
62impl PrivateFrom<usize> for TimerToken {
63	fn from(v: usize) -> TimerToken { TimerToken(v) }
64}
65
66#[derive(Clone, Copy, Debug)]
67pub struct IoEvent {
68	pub token: IoToken,
69	pub src: popol::Source,
70}
71
72#[derive(Clone, Copy, Debug)]
73pub enum Event {
74	Io(IoEvent),
75	Timer(TimerToken),
76	Waker,
77}
78
79impl Event {
80	pub fn io(self) -> Option<IoEvent> {
81		if let Event::Io(e) = self {
82			Some(e)
83		} else {
84			None
85		}
86	}
87
88	pub fn timer(self) -> Option<TimerToken> {
89		if let Event::Timer(t) = self {
90			Some(t)
91		} else {
92			None
93		}
94	}
95
96	pub fn is_waker(self) -> bool {
97		if let Event::Waker = self {
98			true
99		} else {
100			false
101		}
102	}
103}
104
105/// Used as return type in various [Events] methods.
106pub struct EventsIter<'a> {
107	events: &'a Events<'a>,
108	include_io: bool,
109	include_timers: bool,
110	idx: usize,
111}
112
113impl<'a> Iterator for EventsIter<'a> {
114	type Item = Event;
115
116	fn next(&mut self) -> Option<Self::Item> {
117		loop {
118			if self.idx < self.events.poll.len() {
119				let pidx = self.idx;
120				self.idx += 1;
121				if !self.include_io {
122					continue;
123				}
124				let poll = self.events.poll.get(pidx).unwrap();
125				let ev = match poll.key {
126					Source::RuntimeWaker => None,
127					Source::ProcWaker { pid: s_pid } if s_pid == self.events.pid => {
128						Some(Event::Waker)
129					}
130					Source::ProcWaker { .. } => None,
131					Source::Io { pid: s_pid, token } if s_pid == self.events.pid => {
132						Some(Event::Io(IoEvent { token, src: poll.source }))
133					},
134					Source::Io { .. } => None,
135				};
136				if let Some(ev) = ev {
137					return Some(ev);
138				} else {
139					continue;
140				}
141			}
142			if self.idx < self.events.poll.len() + self.events.timeouts.len() {
143				let tidx = self.idx - self.events.poll.len();
144				self.idx += 1;
145				if !self.include_timers {
146					continue;
147				}
148				let timeout = self.events.timeouts.get(tidx).unwrap();
149				if timeout.pid == self.events.pid {
150					return Some(Event::Timer(timeout.token));
151				} else {
152					continue;
153				}
154			}
155			return None;
156		}
157	}
158}
159
160/// Get access to events relevant for the process.
161#[derive(Clone)]
162pub struct Events<'a> {
163	pid: ProcessId,
164	poll: &'a Vec<popol::Event<Source>>,
165	timeouts: &'a Vec<TimerKey>,
166}
167
168impl<'a> Events<'a> {
169	/// Iterate all events.
170	pub fn iter(&'a self) -> EventsIter<'a> {
171		EventsIter {
172			events: self,
173			include_io: true,
174			include_timers: true,
175			idx: 0,
176		}
177	}
178
179	/// Get the number of events.
180	///
181	/// Note that this method has the same cost as iterating through
182	/// [Events::iter].
183	pub fn len(&self) -> usize {
184		self.iter().count()
185	}
186
187	/// Iterate only I/O events.
188	pub fn io(&'a self) -> impl Iterator<Item = IoEvent> + 'a {
189		EventsIter {
190			events: self,
191			include_io: true,
192			include_timers: false,
193			idx: 0,
194		}.map(|e| e.io().unwrap())
195	}
196
197	/// Iterate only timer events.
198	pub fn timers(&'a self) -> impl Iterator<Item = TimerToken> + 'a {
199		EventsIter {
200			events: self,
201			include_io: false,
202			include_timers: true,
203			idx: 0,
204		}.map(|e| e.timer().unwrap())
205	}
206
207	/// Check if our waker has been called.
208	pub fn waker(&self) -> bool {
209		self.poll.iter().any(|e| match e.key {
210			Source::ProcWaker { pid } => pid == self.pid,
211			Source::RuntimeWaker => false,
212			Source::Io { .. } => false
213		})
214	}
215}
216
217impl<'a, 's: 'a> IntoIterator for &'s Events<'a> {
218	type Item = Event;
219	type IntoIter = EventsIter<'a>;
220
221	fn into_iter(self) -> Self::IntoIter {
222		self.iter()
223	}
224}
225
226/// Passed to a process on wakeup. Used to register and unregister new
227/// I/O sources and timers.
228pub struct RuntimeHandle<'a> {
229	pid: ProcessId,
230
231	// Main thread variables.
232	sources: RefCell<&'a mut popol::Sources<Source>>,
233	io_tokens: RefCell<&'a mut HashSet<IoToken>>,
234	io_token_tally: RefCell<&'a mut TokenTally<IoToken>>,
235	timeout_mgr: RefCell<&'a mut TimeoutManager<TimerKey>>,
236	timer_token_tally: RefCell<&'a mut TokenTally<TimerToken>>,
237	waker_src: Source,
238}
239
240/// The `R` parameter represents the underlying stream type, eg. `net::TcpStream`.
241impl<'a> RuntimeHandle<'a> {
242	/// Register a new I/O source.
243	///
244	/// To change the interest of a source, use [reregister_io] instead.
245	pub fn register_io(&self, fd: &impl AsRawFd, events: Interest) -> IoToken {
246		let token = self.io_token_tally.borrow_mut().next();
247		let src = Source::Io { pid: self.pid, token };
248		self.sources.borrow_mut().register(src.clone(), fd, events);
249		self.io_tokens.borrow_mut().insert(token);
250		token
251	}
252
253	/// Change the events you are interested in for this I/O source.
254	pub fn reregister_io(&self, token: IoToken, events: Interest) {
255		let src = Source::Io { pid: self.pid, token };
256		self.sources.borrow_mut().set(&src, events);
257	}
258
259	/// Stop receiving eventsd for this I/O source.
260	pub fn unregister_io(&self, token: IoToken) {
261		let src = Source::Io { pid: self.pid, token };
262		self.sources.borrow_mut().unregister(&src);
263		self.io_tokens.borrow_mut().remove(&token);
264	}
265
266	/// Set a timer to wake you up at the given time.
267	pub fn set_alarm(&self, time: SystemTime) -> TimerToken {
268		let token = self.timer_token_tally.borrow_mut().next();
269		let key = TimerKey {
270			pid: self.pid,
271			token: token,
272		};
273		self.timeout_mgr.borrow_mut().register(key, time);
274		token
275	}
276
277	/// Set a timer to be woken up after the given duration.
278	pub fn set_timer(&self, timer: Duration) -> TimerToken {
279		self.set_alarm(SystemTime::now().checked_add(timer).expect("time overflow"))
280	}
281
282	/// Cancel previously set timer or alarm.
283	pub fn cancel_timer(&self, token: TimerToken) {
284		let key = TimerKey {
285			pid: self.pid,
286			token: token,
287		};
288		self.timeout_mgr.borrow_mut().unregister(key);
289	}
290
291	/// Cancel all previously set timers and alarms.
292	pub fn cancel_all_timers(&self) {
293		self.timeout_mgr.borrow_mut().retain_by_key(|k| k.pid != self.pid);
294	}
295
296	pub fn new_waker(&self) -> Waker {
297		Waker::new(&mut *self.sources.borrow_mut(), self.waker_src)
298			.expect("failed to create waker")
299	}
300}
301
302/// A handle to a process running in an erin [Runtime].
303#[derive(Clone)]
304pub struct ProcessHandle {
305	proc_waker: Waker,
306	rt_waker: Waker,
307	rt_ctrl_tx: chan::Sender<Ctrl>,
308}
309
310impl ProcessHandle {
311	/// Wake up the process.
312	pub fn wake(&self) -> Result<(), io::Error> {
313		self.proc_waker.wake();
314		Ok(())
315	}
316
317	/// Trigger shutdown of the process.
318	pub fn shutdown(&self) -> Result<(), Error> {
319		self.rt_ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
320		self.rt_waker.wake();
321		Ok(())
322	}
323
324	/// Create a new waker that can be used to wake up the process.
325	///
326	/// Note that you can use the [wake] method directly. This method
327	/// allows you to create a more light-weight waker to pass around.
328	///
329	/// Use [into_waker] if you only want the waker and don't need the rest.
330	pub fn clone_waker(&self) -> Waker {
331		self.proc_waker.clone()
332	}
333
334	/// If you only need the waker part of the handle, you can use this
335	/// method to drop the other parts.
336	pub fn into_waker(self) -> Waker {
337		self.proc_waker
338	}
339}
340
341pub struct Runtime {
342	waker: Waker,
343	ctrl_tx: chan::Sender<Ctrl>,
344	join_handle: thread::JoinHandle<()>,
345}
346
347impl Runtime {
348	pub fn start() -> Result<Runtime, io::Error> {
349		let (ctrl_tx, ctrl_rx) = chan::bounded(0);
350		
351		let mut sources = popol::Sources::new();
352		let waker = Waker::new(&mut sources, Source::RuntimeWaker)?;
353		let jh = thread::Builder::new()
354			.name("erin_runtime".into())
355			.spawn(|| {
356				run(ctrl_rx, sources);
357			})?;
358
359		Ok(Runtime {
360			waker: waker,
361			ctrl_tx: ctrl_tx,
362			join_handle: jh,
363		})
364	}
365
366	/// Adds a new process to the runtime.
367	///
368	/// Returns a [ProcessHandle] that can be used to wake up and shut down the
369	/// process.
370	///
371	/// NB This call depends on the runtime process waking up and responding to us.
372	/// While this should generally not be a possibility, it is important that no
373	/// calls to any of your [Process::wakeup] methods will be waiting on a lock
374	/// that is held when calling this method.
375	pub fn add_process(&self, process: Box<dyn Process>) -> Result<ProcessHandle, Error> {
376		// This will be the response channel where we will wait for a waker on.
377		let (waker_tx, waker_rx) = chan::bounded(1);
378		self.ctrl_tx.send(Ctrl::NewProcess { process, waker_tx })
379			.map_err(|_| Error::RuntimeProcessDied)?;
380
381		// Wake up the runtime thread that will likely be in a blocking call.
382		self.waker.wake();
383
384		let proc_waker = waker_rx.recv()
385			.map_err(|_| Error::RuntimeProcessDied)?
386			.map_err(|()| Error::SetupFailed)?;
387		Ok(ProcessHandle {
388			proc_waker: proc_waker,
389			rt_ctrl_tx: self.ctrl_tx.clone(),
390			rt_waker: self.waker.clone(),
391		})
392	}
393
394	/// Shut down the runtime.
395	///
396	/// This will call the [Process::shutdown] function on all processes
397	/// before returning.
398	pub fn shutdown(self) -> Result<(), Error> {
399		self.ctrl_tx.send(Ctrl::Shutdown).map_err(|_| Error::RuntimeProcessDied)?;
400		self.waker.wake();
401		self.join_handle.join().map_err(Error::Thread)?;
402		Ok(())
403	}
404}
405
406#[derive(Debug, PartialEq, Eq, Clone, Copy)]
407enum Source {
408	RuntimeWaker,
409	Io {
410		pid: ProcessId,
411		token: IoToken,
412	},
413	ProcWaker {
414		pid: ProcessId,
415	},
416}
417
418impl Source {
419	fn pid(self) -> Option<ProcessId> {
420		match self {
421			Source::RuntimeWaker => None,
422			Source::Io { pid, .. } => Some(pid),
423			Source::ProcWaker { pid, .. } => Some(pid),
424		}
425	}
426}
427
428/// Used to safely create new enumerated tokens.
429///
430/// This implementation never emits a token with integer value 0.
431struct TokenTally<T> {
432	tally: usize,
433	_pd: PhantomData<T>,
434}
435
436impl<T: PrivateFrom<usize>> TokenTally<T> {
437	/// Create a new [TokenTally] that starts with value 1.
438	fn new() -> TokenTally<T> {
439		TokenTally {
440			tally: 1,
441			_pd: PhantomData,
442		}
443	}
444
445	/// Get the next token to use.
446	fn next(&mut self) -> T {
447		let next = self.tally;
448		self.tally += 1;
449		assert_ne!(next, usize::max_value(), "token overflow");
450		T::from(next)
451	}
452}
453
454/// Key used for the timeout manager.
455#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456struct TimerKey {
457	pid: ProcessId,
458	token: TimerToken,
459}
460
461enum Ctrl {
462	NewProcess {
463		process: Box<dyn Process>,
464		/// The channel where we are expected to reply with a waker
465		/// for the new process.
466		waker_tx: chan::Sender<Result<Waker, ()>>,
467	},
468	Shutdown,
469}
470
471/// The main run method for the runtime.
472fn run(
473	ctrl_rx: chan::Receiver<Ctrl>,
474	sources: popol::Sources<Source>,
475) {
476	info!("Initializing service..");
477
478	// Core variables.
479	let mut sources = sources;
480	let mut pid_tally = 0;
481	let mut processes = HashMap::<ProcessId, (Box<dyn Process>, HashSet<IoToken>)>::new();
482	let mut io_token_tally = TokenTally::new();
483	let mut timeout_mgr = TimeoutManager::new();
484	let mut timer_token_tally = TokenTally::new();
485
486	// Buffers to keep single allocation.
487	let mut poll_events = Vec::<popol::Event<Source>>::with_capacity(32);
488	let mut timeouts = Vec::<TimerKey>::with_capacity(32);
489	let mut dead_procs = Vec::new();
490
491	loop {
492		let timeout = timeout_mgr
493			.next(SystemTime::now())
494			.unwrap_or(WAIT_TIMEOUT)
495			.into();
496
497		trace!(
498			"Polling {} source(s) and {} timeout(s), waking up in {:?}..",
499			sources.len(), timeout_mgr.len(), timeout,
500		);
501
502		poll_events.clear();
503		let ret = sources.wait_timeout(&mut poll_events, timeout); // Blocking.
504		if let Err(err) = ret {
505			if err.kind() != io::ErrorKind::TimedOut {
506				error!("popol returned an error: {:?}", err);
507				return;
508			}
509		}
510
511		timeouts.clear();
512		timeout_mgr.wake(SystemTime::now().into(), &mut timeouts);
513
514		if poll_events.is_empty() && timeouts.is_empty() {
515			continue;
516		}
517
518		if poll_events.iter().any(|e| e.key == Source::RuntimeWaker) {
519			while let Ok(ctrl) = ctrl_rx.try_recv() {
520				match ctrl {
521					Ctrl::NewProcess { mut process, waker_tx } => {
522						let pid = pid_tally;
523						pid_tally += 1;
524
525						// Call the setup method on the process.
526						// Only really add the process if setup doesn't fail.
527						let mut io_tokens = HashSet::new();
528						let handle = RuntimeHandle {
529							pid: pid,
530							sources: RefCell::new(&mut sources),
531							io_tokens: RefCell::new(&mut io_tokens),
532							io_token_tally: RefCell::new(&mut io_token_tally),
533							timeout_mgr: RefCell::new(&mut timeout_mgr),
534							timer_token_tally: RefCell::new(&mut timer_token_tally),
535							waker_src: Source::ProcWaker { pid },
536						};
537						let ret = if process.setup(&handle).is_ok() {
538							processes.insert(pid, (process, io_tokens));
539							let waker = Waker::new(&mut sources, Source::ProcWaker { pid })
540								.expect("failed to create waker");
541							Ok(waker)
542						} else {
543							error!("Setup method of new process errored. Not adding.");
544							Err(())
545						};
546						if let Err(_) = waker_tx.send(ret) {
547							error!(
548								"User sent new process (pid {}) and hung up on response channel.",
549								pid,
550							);
551						}
552					}
553					Ctrl::Shutdown => {
554						info!("Shutdown signal received, shutting down processes...");
555						for (pid, (proc, _)) in processes.iter_mut() {
556							trace!("Shutting down process with pid {}", pid);
557							proc.shutdown();
558						}
559						info!("Shutdown complete");
560						return;
561					}
562				}
563			}
564		}
565
566		trace!(
567			"Woke up with {} I/O source(s) ready and {} timers expired",
568			poll_events.len(), timeouts.len(),
569		);
570
571		for (pid, (proc, io_tokens)) in processes.iter_mut() {
572			let has_poll = poll_events.iter().any(|e| e.key.pid() == Some(*pid));
573			let has_timer = timeouts.iter().any(|t| t.pid == *pid);
574			if !has_poll && !has_timer {
575				continue;
576			}
577			let handle = RuntimeHandle {
578				pid: *pid,
579				sources: RefCell::new(&mut sources),
580				io_tokens: RefCell::new(io_tokens),
581				io_token_tally: RefCell::new(&mut io_token_tally),
582				timeout_mgr: RefCell::new(&mut timeout_mgr),
583				timer_token_tally: RefCell::new(&mut timer_token_tally),
584				waker_src: Source::ProcWaker { pid: *pid },
585			};
586			let ev = Events {
587				pid: *pid,
588				poll: &poll_events,
589				timeouts: &timeouts,
590			};
591			if proc.wakeup(&handle, ev).is_err() {
592				dead_procs.push(*pid);
593			}
594		}
595		for pid in dead_procs.drain(..) {
596			let (_proc, io_tokens) = processes.remove(&pid).unwrap();
597			// Unregister all the poll tokens it has.
598			for token in io_tokens {
599				sources.unregister(&Source::Io { pid, token });
600			}
601			// Unregister the waker.
602			sources.unregister(&Source::ProcWaker { pid });
603			// Remove all timers.
604			timeout_mgr.retain_by_key(|k| k.pid != pid);
605		}
606	}
607}