use std::cmp::Ordering;
use std::collections::BinaryHeap;
use crate::time::Time;
#[derive(Debug, Clone)]
pub struct Scheduled<E> {
pub time: Time,
pub sequence: u64,
pub event: E,
}
impl<E> Scheduled<E> {
pub fn new(time: Time, sequence: u64, event: E) -> Self {
Scheduled {
time,
sequence,
event,
}
}
}
impl<E> PartialEq for Scheduled<E> {
fn eq(&self, other: &Self) -> bool {
self.time == other.time && self.sequence == other.sequence
}
}
impl<E> Eq for Scheduled<E> {}
impl<E> PartialOrd for Scheduled<E> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<E> Ord for Scheduled<E> {
fn cmp(&self, other: &Self) -> Ordering {
match other.time.cmp(&self.time) {
Ordering::Equal => other.sequence.cmp(&self.sequence),
ordering => ordering,
}
}
}
#[derive(Debug)]
pub struct EventQueue<E> {
heap: BinaryHeap<Scheduled<E>>,
next_sequence: u64,
}
impl<E> EventQueue<E> {
pub fn new() -> Self {
EventQueue {
heap: BinaryHeap::new(),
next_sequence: 0,
}
}
pub fn with_capacity(capacity: usize) -> Self {
EventQueue {
heap: BinaryHeap::with_capacity(capacity),
next_sequence: 0,
}
}
pub fn schedule(&mut self, time: Time, event: E) -> u64 {
let sequence = self.next_sequence;
self.next_sequence += 1;
self.heap.push(Scheduled::new(time, sequence, event));
sequence
}
pub fn pop(&mut self) -> Option<Scheduled<E>> {
self.heap.pop()
}
pub fn peek(&self) -> Option<&Scheduled<E>> {
self.heap.peek()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn clear(&mut self) {
self.heap.clear();
}
pub fn total_scheduled(&self) -> u64 {
self.next_sequence
}
pub fn rewrite<F>(&mut self, mut f: F)
where
F: FnMut(Scheduled<E>) -> Option<(Time, E)>,
{
let drained: Vec<_> = self.heap.drain().collect();
for s in drained {
if let Some((time, event)) = f(s) {
self.schedule(time, event);
}
}
}
}
impl<E> Default for EventQueue<E> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone, PartialEq)]
struct TestEvent(u32);
#[test]
fn events_ordered_by_time() {
let mut queue = EventQueue::new();
queue.schedule(Time::from_millis(300), TestEvent(3));
queue.schedule(Time::from_millis(100), TestEvent(1));
queue.schedule(Time::from_millis(200), TestEvent(2));
assert_eq!(queue.pop().unwrap().event, TestEvent(1));
assert_eq!(queue.pop().unwrap().event, TestEvent(2));
assert_eq!(queue.pop().unwrap().event, TestEvent(3));
assert!(queue.pop().is_none());
}
#[test]
fn ties_broken_by_sequence() {
let mut queue = EventQueue::new();
let t = Time::from_millis(100);
queue.schedule(t, TestEvent(1));
queue.schedule(t, TestEvent(2));
queue.schedule(t, TestEvent(3));
assert_eq!(queue.pop().unwrap().event, TestEvent(1));
assert_eq!(queue.pop().unwrap().event, TestEvent(2));
assert_eq!(queue.pop().unwrap().event, TestEvent(3));
}
#[test]
fn sequence_numbers_are_monotonic() {
let mut queue = EventQueue::new();
let s1 = queue.schedule(Time::ZERO, TestEvent(1));
let s2 = queue.schedule(Time::ZERO, TestEvent(2));
let s3 = queue.schedule(Time::ZERO, TestEvent(3));
assert_eq!(s1, 0);
assert_eq!(s2, 1);
assert_eq!(s3, 2);
assert_eq!(queue.total_scheduled(), 3);
}
#[test]
fn peek_does_not_remove() {
let mut queue = EventQueue::new();
queue.schedule(Time::from_millis(100), TestEvent(1));
assert_eq!(queue.peek().unwrap().event, TestEvent(1));
assert_eq!(queue.len(), 1);
assert_eq!(queue.pop().unwrap().event, TestEvent(1));
assert!(queue.is_empty());
}
#[test]
fn rewrite_filters_and_replaces_events() {
let mut queue = EventQueue::new();
queue.schedule(Time::from_millis(10), TestEvent(1));
queue.schedule(Time::from_millis(20), TestEvent(2));
queue.schedule(Time::from_millis(30), TestEvent(3));
queue.rewrite(|s| match s.event.0 {
2 => None,
3 => Some((Time::from_millis(5), TestEvent(99))),
_ => Some((s.time, s.event)),
});
let drained: Vec<_> = std::iter::from_fn(|| queue.pop())
.map(|s| (s.time, s.event.0))
.collect();
assert_eq!(
drained,
vec![(Time::from_millis(5), 99), (Time::from_millis(10), 1)]
);
}
#[test]
fn rewrite_assigns_fresh_sequence_numbers() {
let mut queue = EventQueue::new();
queue.schedule(Time::from_millis(100), TestEvent(1));
queue.schedule(Time::from_millis(200), TestEvent(2));
queue.rewrite(|s| Some((Time::from_millis(50), s.event)));
assert_eq!(queue.pop().unwrap().event.0, 1);
assert_eq!(queue.pop().unwrap().event.0, 2);
}
#[test]
fn complex_ordering() {
let mut queue = EventQueue::new();
queue.schedule(Time::from_millis(50), TestEvent(2));
queue.schedule(Time::from_millis(100), TestEvent(4));
queue.schedule(Time::from_millis(50), TestEvent(3));
queue.schedule(Time::from_millis(25), TestEvent(1));
queue.schedule(Time::from_millis(100), TestEvent(5));
let events: Vec<_> = std::iter::from_fn(|| queue.pop())
.map(|s| s.event.0)
.collect();
assert_eq!(events, vec![1, 2, 3, 4, 5]);
}
}