1use candid::CandidType;
5use serde::{Deserialize, Serialize};
6use std::collections::{btree_map::Entry, BTreeMap};
7use std::{error, fmt};
8
9#[derive(
10 CandidType, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Hash,
11)]
12pub struct EventId<I> {
14 pub expiry: u64,
16 pub nonce: I,
18}
19
20impl<I> EventId<I> {
21 pub fn new(expiry: u64, nonce: I) -> Self {
22 Self { expiry, nonce }
23 }
24}
25
26#[derive(Clone, Debug)]
27pub struct EventQueueFullError;
29
30impl error::Error for EventQueueFullError {}
31
32impl fmt::Display for EventQueueFullError {
33 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34 f.write_str("Event queue is full")
35 }
36}
37
38#[derive(CandidType, Serialize, Deserialize, Clone, Debug, Hash)]
39pub struct EventQueue<I: Ord, T> {
42 capacity: usize,
43 events: BTreeMap<EventId<I>, T>,
44 index: BTreeMap<I, EventId<I>>,
45}
46
47impl<I: Ord, T> Default for EventQueue<I, T> {
48 fn default() -> Self {
50 EventQueue::new(0)
51 }
52}
53
54fn is_vacant<K, V>(entry: &Entry<K, V>) -> bool {
55 matches!(entry, Entry::Vacant(_))
56}
57
58impl<I: Ord, T> EventQueue<I, T> {
59 pub fn new(capacity: usize) -> Self {
61 Self {
62 capacity,
63 events: BTreeMap::default(),
64 index: BTreeMap::default(),
65 }
66 }
67
68 pub fn insert(&mut self, id: EventId<I>, event: T) -> Result<(), EventQueueFullError>
72 where
73 I: Clone,
74 {
75 let size = self.events.len();
76 let entry = self.events.entry(id.clone());
77 if is_vacant(&entry) {
78 if size >= self.capacity {
79 return Err(EventQueueFullError);
80 }
81 entry.or_insert(event);
82 self.index.insert(id.nonce.clone(), id);
83 } else {
84 entry.and_modify(|v| *v = event);
85 }
86 Ok(())
87 }
88
89 pub fn pop_front<F: Fn(&T) -> bool>(&mut self, matching: F) -> Option<(EventId<I>, T)>
92 where
93 I: Clone,
94 {
95 if let Some(id) =
96 self.events
97 .iter()
98 .find_map(|(id, e)| if matching(e) { Some(id.clone()) } else { None })
99 {
100 self.index.remove(&id.nonce);
101 self.events.remove(&id).map(|e| (id.clone(), e))
102 } else {
103 None
104 }
105 }
106
107 pub fn purge<F: Fn(&T) -> bool>(&mut self, expiry: u64, matching: F)
110 where
111 I: Default,
112 {
113 let key = EventId {
114 expiry,
115 nonce: I::default(),
116 };
117 let mut newer_events = self.events.split_off(&key);
118 self.events.retain(|k, v| {
119 let to_remove = matching(v);
120 if to_remove {
121 self.index.remove(&k.nonce);
122 }
123 !to_remove
124 });
125 self.events.append(&mut newer_events);
126 }
127
128 pub fn purge_expired(&mut self, expiry: u64)
131 where
132 I: Default,
133 {
134 let key = EventId {
135 expiry,
136 nonce: I::default(),
137 };
138 let to_keep = self.events.split_off(&key);
139 for to_remove in self.events.keys() {
140 self.index.remove(&to_remove.nonce);
141 }
142 self.events = to_keep;
143 }
144
145 pub fn get(&self, id: &EventId<I>) -> Option<&T> {
148 self.events.get(id)
149 }
150
151 pub fn remove(&mut self, id: &EventId<I>) -> Option<T> {
154 if let Some(value) = self.events.remove(id) {
155 self.index.remove(&id.nonce);
156 Some(value)
157 } else {
158 None
159 }
160 }
161
162 pub fn modify<F: FnOnce(&mut T)>(&mut self, id: EventId<I>, f: F) {
165 self.events.entry(id).and_modify(|v: &mut T| f(v));
166 }
167
168 pub fn find(&self, nonce: &I) -> Option<(&EventId<I>, &T)> {
171 self.index
172 .get(nonce)
173 .and_then(|id| self.events.get(id).map(|e| (id, e)))
174 }
175
176 pub fn capacity(&self) -> usize {
178 self.capacity
179 }
180
181 pub fn len(&self) -> usize {
184 self.events.len()
185 }
186
187 pub fn is_full(&self) -> bool {
190 self.events.len() == self.capacity
191 }
192
193 pub fn is_empty(&self) -> bool {
196 self.events.is_empty()
197 }
198
199 pub fn iter(&self) -> Box<dyn Iterator<Item = (&EventId<I>, &T)> + '_> {
201 Box::new(self.events.iter())
202 }
203
204 #[cfg(test)]
207 fn selfcheck(&self) -> bool
208 where
209 I: Eq + Clone,
210 {
211 use std::collections::BTreeSet;
212 self.len() <= self.capacity
213 && self.events.len() == self.index.len()
214 && self.events.keys().collect::<Vec<_>>() == self.index.values().collect::<Vec<_>>()
215 && self.index.keys().cloned().collect::<BTreeSet<_>>().len() == self.index.len()
216 }
217}
218
219#[cfg(test)]
220mod test {
221 use super::*;
222 use assert_matches::*;
223
224 #[derive(Clone, Debug, PartialEq, Eq)]
225 enum Item {
226 Begin(u8),
227 Middle(u8),
228 End(u8),
229 }
230 use Item::*;
231
232 fn is_begin(item: &Item) -> bool {
233 matches!(item, Begin(_))
234 }
235
236 fn is_middle(item: &Item) -> bool {
237 matches!(item, Middle(_))
238 }
239
240 fn is_end(item: &Item) -> bool {
241 matches!(item, End(_))
242 }
243
244 #[test]
245 fn basic() {
246 let mut q: EventQueue<u8, Item> = EventQueue::new(10);
247 assert!(q.is_empty());
248 assert_eq!(q.len(), 0);
249 assert!(q.selfcheck());
250
251 for i in 0..10 {
252 let expiry = if i < 5 { 0 } else { 1 };
253 assert_matches!(q.insert(EventId::new(expiry, i), Begin(i)), Ok(_));
254 assert!(q.selfcheck());
255 }
256 assert_matches!(
258 q.insert(EventId::new(2, 0), Begin(0)),
259 Err(EventQueueFullError)
260 );
261 assert!(q.selfcheck());
262 assert_matches!(q.insert(EventId::new(0, 1), Middle(10)), Ok(_));
264 assert_matches!(q.insert(EventId::new(0, 4), End(40)), Ok(_));
265 assert!(q.selfcheck());
266
267 for i in 0..10 {
269 assert_matches!(q.find(&i), Some(_));
270 }
271
272 assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 0), Begin(0))));
274 assert!(q.selfcheck());
275 assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 2), Begin(2))));
276 assert!(q.selfcheck());
277 assert_eq!(
278 q.pop_front(is_middle),
279 Some((EventId::new(0, 1), Middle(10)))
280 );
281 assert!(q.selfcheck());
282 assert_eq!(q.len(), 7);
283
284 let mut p = q.clone();
285
286 q.purge(1, is_begin);
288 assert_eq!(q.len(), 6);
289 assert!(q.selfcheck());
290 assert_eq!(q.pop_front(is_begin), Some((EventId::new(1, 5), Begin(5))));
291 assert!(q.selfcheck());
292 assert_eq!(q.pop_front(is_end), Some((EventId::new(0, 4), End(40))));
293 assert!(q.selfcheck());
294 assert_eq!(q.len(), 4);
295
296 q.purge(2, is_begin);
297 assert_eq!(q.len(), 0);
298 assert!(q.selfcheck());
299
300 println!("{:?}", p.iter().collect::<Vec<_>>());
302 p.purge_expired(1);
303 assert_eq!(p.len(), 5);
304 assert!(p.selfcheck());
305
306 p.purge_expired(2);
307 assert_eq!(p.len(), 0);
308 assert!(p.selfcheck());
309 }
310}