erin 0.1.0

A very simple I/O reactor that allows creating green thread-like processes.
Documentation
//! Time-related functionality useful for reactors.

use std::time::{Duration, SystemTime};

/// Manages timers and triggers timeouts.
pub struct TimeoutManager<K> {
	timeouts: Vec<(K, SystemTime)>,
}

impl<K: PartialEq> TimeoutManager<K> {
	/// Create a new timeout manager.
	pub fn new() -> Self {
		Self {
			timeouts: vec![],
		}
	}

	/// Return the number of timeouts being tracked.
	pub fn len(&self) -> usize {
		self.timeouts.len()
	}

	/// Register a new timeout with an associated key and wake-up time.
	pub fn register(&mut self, key: K, time: SystemTime) {
		self.timeouts.push((key, time));
		self.timeouts.sort_unstable_by(|(_, a), (_, b)| b.cmp(a));
	}

	/// Unregister the timer with the given key.
	pub fn unregister(&mut self, key: K) {
		self.timeouts.retain(|(k, _)| *k != key)
	}

	/// Retains only the timers with keys that satisfy the predicates.
	pub fn retain_by_key<F>(&mut self, mut f: F)
	where
		F: FnMut(&K) -> bool,
	{
		self.timeouts.retain(|(k, _)| f(k));
	}

	/// Get the minimum time duration we should wait for at least one timeout
	/// to be reached.  Returns `None` if there are no timeouts.
	pub fn next(&self, now: impl Into<SystemTime>) -> Option<Duration> {
		let now = now.into();

		self.timeouts.last().map(|(_, t)| {
			if *t >= now {
				t.duration_since(now).expect("checked t is more recent than now")
			} else {
				Duration::from_secs(0)
			}
		})
	}

	/// Given the current time, populate the input vector with the keys that
	/// have timed out. Returns the number of keys that timed out.
	pub fn wake(&mut self, now: SystemTime, woken: &mut Vec<K>) -> usize {
		let before = woken.len();

		while let Some((k, t)) = self.timeouts.pop() {
			if now >= t {
				woken.push(k);
			} else {
				self.timeouts.push((k, t));
				break;
			}
		}
		woken.len() - before
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use quickcheck_macros::quickcheck;

	#[quickcheck]
	fn properties(timeouts: Vec<u64>) -> bool {
		let mut tm = TimeoutManager::new();
		let mut now = SystemTime::now();

		for t in timeouts {
			tm.register(t, now + Duration::from_secs(t));
		}

		let mut woken = Vec::new();
		while let Some(delta) = tm.next(now) {
			now.elapse(delta);
			assert!(tm.wake(now, &mut woken) > 0);
		}

		woken.windows(2).all(|w| w[0] <= w[1])
	}

	#[test]
	fn test_wake() {
		let mut tm = TimeoutManager::new();
		let now = SystemTime::now();

		tm.register(0xA, now + Duration::from_millis(8));
		tm.register(0xB, now + Duration::from_millis(16));
		tm.register(0xC, now + Duration::from_millis(64));
		tm.register(0xD, now + Duration::from_millis(72));

		let mut timeouts = Vec::new();

		assert_eq!(tm.wake(now, &mut timeouts), 0);
		assert_eq!(timeouts, vec![]);
		assert_eq!(tm.len(), 4);
		assert_eq!(
			tm.wake(now + Duration::from_millis(9), &mut timeouts),
			1
		);
		assert_eq!(timeouts, vec![0xA]);
		assert_eq!(tm.len(), 3, "one timeout has expired");

		timeouts.clear();

		assert_eq!(
			tm.wake(now + Duration::from_millis(66), &mut timeouts),
			2
		);
		assert_eq!(timeouts, vec![0xB, 0xC]);
		assert_eq!(tm.len(), 1, "another two timeouts have expired");

		timeouts.clear();

		assert_eq!(
			tm.wake(now + Duration::from_millis(96), &mut timeouts),
			1
		);
		assert_eq!(timeouts, vec![0xD]);
		assert_eq!(0, tm.len(), "all timeouts have expired");
	}
}