use event_listener::{Event, EventListener};
use futures_intrusive::sync::ManualResetEvent;
use parking_lot::RwLock;
use priority_queue::PriorityQueue;
use slab::Slab;
use std::{cmp::Reverse, collections::HashMap, sync::Arc};
#[derive(Default)]
pub struct TimeHeap {
inner: RwLock<Inner>,
waker: Event,
}
impl TimeHeap {
pub fn subscribe(&self, tick: u64) -> Arc<ManualResetEvent> {
{
let inner = self.inner.read();
if let Some(&evt_idx) = inner.tick_to_evt.get(&tick) {
return inner.events[evt_idx].clone();
}
}
{
let mut inner = self.inner.write();
let pre_first = inner.lowest();
if let Some(&evt_idx) = inner.tick_to_evt.get(&tick) {
return inner.events[evt_idx].clone();
}
let evt = Arc::new(ManualResetEvent::new(false));
let evt_idx = inner.events.insert(evt.clone());
inner.tick_to_evt.insert(tick, evt_idx);
inner.evt_to_tick.push(evt_idx, Reverse(tick));
let post_first = inner.lowest();
if pre_first != post_first {
self.waker.notify(1);
}
evt
}
}
pub fn wait_until_change(&self) -> EventListener {
self.waker.listen()
}
pub fn earliest_tick(&self) -> Option<u64> {
self.inner.read().lowest()
}
pub fn fire_before(&self, tick: u64) {
let mut inner = self.inner.write();
while let Some((evt_id, evt_tick)) = inner.evt_to_tick.pop() {
if evt_tick.0 > tick {
inner.evt_to_tick.push(evt_id, evt_tick);
break;
}
log::trace!("firing evt_id={evt_id}, evt_tick={}", evt_tick.0);
inner.events.remove(evt_id).set();
inner.tick_to_evt.remove(&tick);
}
}
}
#[derive(Default)]
pub struct Inner {
events: Slab<Arc<ManualResetEvent>>,
evt_to_tick: PriorityQueue<usize, Reverse<u64>>,
tick_to_evt: HashMap<u64, usize>,
}
impl Inner {
fn lowest(&self) -> Option<u64> {
self.evt_to_tick.peek().map(|v| v.1).map(|s| s.0)
}
}