use candid::CandidType;
use serde::{Deserialize, Serialize};
use std::collections::{btree_map::Entry, BTreeMap};
use std::{error, fmt};
#[derive(
CandidType, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Hash,
)]
pub struct EventId<I> {
pub expiry: u64,
pub nonce: I,
}
impl<I> EventId<I> {
pub fn new(expiry: u64, nonce: I) -> Self {
Self { expiry, nonce }
}
}
#[derive(Clone, Debug)]
pub struct EventQueueFullError;
impl error::Error for EventQueueFullError {}
impl fmt::Display for EventQueueFullError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Event queue is full")
}
}
#[derive(CandidType, Serialize, Deserialize, Clone, Debug, Hash)]
pub struct EventQueue<I: Ord, T> {
capacity: usize,
events: BTreeMap<EventId<I>, T>,
index: BTreeMap<I, EventId<I>>,
}
impl<I: Ord, T> Default for EventQueue<I, T> {
fn default() -> Self {
EventQueue::new(0)
}
}
fn is_vacant<K, V>(entry: &Entry<K, V>) -> bool {
matches!(entry, Entry::Vacant(_))
}
impl<I: Ord, T> EventQueue<I, T> {
pub fn new(capacity: usize) -> Self {
Self {
capacity,
events: BTreeMap::default(),
index: BTreeMap::default(),
}
}
pub fn insert(&mut self, id: EventId<I>, event: T) -> Result<(), EventQueueFullError>
where
I: Clone,
{
let size = self.events.len();
let entry = self.events.entry(id.clone());
if is_vacant(&entry) {
if size >= self.capacity {
return Err(EventQueueFullError);
}
entry.or_insert(event);
self.index.insert(id.nonce.clone(), id);
} else {
entry.and_modify(|v| *v = event);
}
Ok(())
}
pub fn pop_front<F: Fn(&T) -> bool>(&mut self, matching: F) -> Option<(EventId<I>, T)>
where
I: Clone,
{
if let Some(id) =
self.events
.iter()
.find_map(|(id, e)| if matching(e) { Some(id.clone()) } else { None })
{
self.index.remove(&id.nonce);
self.events.remove(&id).map(|e| (id.clone(), e))
} else {
None
}
}
pub fn purge<F: Fn(&T) -> bool>(&mut self, expiry: u64, matching: F)
where
I: Default,
{
let key = EventId {
expiry,
nonce: I::default(),
};
let mut newer_events = self.events.split_off(&key);
self.events.retain(|k, v| {
let to_remove = matching(v);
if to_remove {
self.index.remove(&k.nonce);
}
!to_remove
});
self.events.append(&mut newer_events);
}
pub fn purge_expired(&mut self, expiry: u64)
where
I: Default,
{
let key = EventId {
expiry,
nonce: I::default(),
};
let to_keep = self.events.split_off(&key);
for to_remove in self.events.keys() {
self.index.remove(&to_remove.nonce);
}
self.events = to_keep;
}
pub fn get(&self, id: &EventId<I>) -> Option<&T> {
self.events.get(id)
}
pub fn remove(&mut self, id: &EventId<I>) -> Option<T> {
if let Some(value) = self.events.remove(id) {
self.index.remove(&id.nonce);
Some(value)
} else {
None
}
}
pub fn modify<F: FnOnce(&mut T)>(&mut self, id: EventId<I>, f: F) {
self.events.entry(id).and_modify(|v: &mut T| f(v));
}
pub fn find(&self, nonce: &I) -> Option<(&EventId<I>, &T)> {
self.index
.get(nonce)
.and_then(|id| self.events.get(id).map(|e| (id, e)))
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_full(&self) -> bool {
self.events.len() == self.capacity
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn iter(&self) -> Box<dyn Iterator<Item = (&EventId<I>, &T)> + '_> {
Box::new(self.events.iter())
}
#[cfg(test)]
fn selfcheck(&self) -> bool
where
I: Eq + Clone,
{
use std::collections::BTreeSet;
self.len() <= self.capacity
&& self.events.len() == self.index.len()
&& self.events.keys().collect::<Vec<_>>() == self.index.values().collect::<Vec<_>>()
&& self.index.keys().cloned().collect::<BTreeSet<_>>().len() == self.index.len()
}
}
#[cfg(test)]
mod test {
use super::*;
use assert_matches::*;
#[derive(Clone, Debug, PartialEq, Eq)]
enum Item {
Begin(u8),
Middle(u8),
End(u8),
}
use Item::*;
fn is_begin(item: &Item) -> bool {
matches!(item, Begin(_))
}
fn is_middle(item: &Item) -> bool {
matches!(item, Middle(_))
}
fn is_end(item: &Item) -> bool {
matches!(item, End(_))
}
#[test]
fn basic() {
let mut q: EventQueue<u8, Item> = EventQueue::new(10);
assert!(q.is_empty());
assert_eq!(q.len(), 0);
assert!(q.selfcheck());
for i in 0..10 {
let expiry = if i < 5 { 0 } else { 1 };
assert_matches!(q.insert(EventId::new(expiry, i), Begin(i)), Ok(_));
assert!(q.selfcheck());
}
assert_matches!(
q.insert(EventId::new(2, 0), Begin(0)),
Err(EventQueueFullError)
);
assert!(q.selfcheck());
assert_matches!(q.insert(EventId::new(0, 1), Middle(10)), Ok(_));
assert_matches!(q.insert(EventId::new(0, 4), End(40)), Ok(_));
assert!(q.selfcheck());
for i in 0..10 {
assert_matches!(q.find(&i), Some(_));
}
assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 0), Begin(0))));
assert!(q.selfcheck());
assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 2), Begin(2))));
assert!(q.selfcheck());
assert_eq!(
q.pop_front(is_middle),
Some((EventId::new(0, 1), Middle(10)))
);
assert!(q.selfcheck());
assert_eq!(q.len(), 7);
let mut p = q.clone();
q.purge(1, is_begin);
assert_eq!(q.len(), 6);
assert!(q.selfcheck());
assert_eq!(q.pop_front(is_begin), Some((EventId::new(1, 5), Begin(5))));
assert!(q.selfcheck());
assert_eq!(q.pop_front(is_end), Some((EventId::new(0, 4), End(40))));
assert!(q.selfcheck());
assert_eq!(q.len(), 4);
q.purge(2, is_begin);
assert_eq!(q.len(), 0);
assert!(q.selfcheck());
println!("{:?}", p.iter().collect::<Vec<_>>());
p.purge_expired(1);
assert_eq!(p.len(), 5);
assert!(p.selfcheck());
p.purge_expired(2);
assert_eq!(p.len(), 0);
assert!(p.selfcheck());
}
}