spnr_lib/
event_queue.rs

1/// An event queue ordered by expiry time.
2///
3/// Each event has an id and status, and they can be inserted/looked up/purged from the queue.
4use candid::CandidType;
5use serde::{Deserialize, Serialize};
6use std::collections::{btree_map::Entry, BTreeMap};
7use std::{error, fmt};
8
9#[derive(
10    CandidType, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Hash,
11)]
12/// The `nonce` field of an `EventId` must be unique, and is usually randomly generated.
13pub struct EventId<I> {
14    /// Expiration time since UNIX epoch (in nanoseconds).
15    pub expiry: u64,
16    /// Unique identifier of an event.
17    pub nonce: I,
18}
19
20impl<I> EventId<I> {
21    pub fn new(expiry: u64, nonce: I) -> Self {
22        Self { expiry, nonce }
23    }
24}
25
26#[derive(Clone, Debug)]
27/// Error when an operation cannot be performed because the event queue reaches its max capacity.
28pub struct EventQueueFullError;
29
30impl error::Error for EventQueueFullError {}
31
32impl fmt::Display for EventQueueFullError {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        f.write_str("Event queue is full")
35    }
36}
37
38#[derive(CandidType, Serialize, Deserialize, Clone, Debug, Hash)]
39/// An event queue is an ordered list of events of nonce type `I` and status type `T`.
40/// Its max capacity is fixed at initialization.
41pub struct EventQueue<I: Ord, T> {
42    capacity: usize,
43    events: BTreeMap<EventId<I>, T>,
44    index: BTreeMap<I, EventId<I>>,
45}
46
47impl<I: Ord, T> Default for EventQueue<I, T> {
48    /// Return an empty event queue of zero capacity.
49    fn default() -> Self {
50        EventQueue::new(0)
51    }
52}
53
54fn is_vacant<K, V>(entry: &Entry<K, V>) -> bool {
55    matches!(entry, Entry::Vacant(_))
56}
57
58impl<I: Ord, T> EventQueue<I, T> {
59    /// Return an event queue of the given max capacity.
60    pub fn new(capacity: usize) -> Self {
61        Self {
62            capacity,
63            events: BTreeMap::default(),
64            index: BTreeMap::default(),
65        }
66    }
67
68    /// Insert an event of the given id and status into the queue.
69    /// Return error if the queue is full.
70    /// This operation is `O(log(n))`.
71    pub fn insert(&mut self, id: EventId<I>, event: T) -> Result<(), EventQueueFullError>
72    where
73        I: Clone,
74    {
75        let size = self.events.len();
76        let entry = self.events.entry(id.clone());
77        if is_vacant(&entry) {
78            if size >= self.capacity {
79                return Err(EventQueueFullError);
80            }
81            entry.or_insert(event);
82            self.index.insert(id.nonce.clone(), id);
83        } else {
84            entry.and_modify(|v| *v = event);
85        }
86        Ok(())
87    }
88
89    /// Remove and return the first matching event.
90    /// This operation is linear in the events it has to search through.
91    pub fn pop_front<F: Fn(&T) -> bool>(&mut self, matching: F) -> Option<(EventId<I>, T)>
92    where
93        I: Clone,
94    {
95        if let Some(id) =
96            self.events
97                .iter()
98                .find_map(|(id, e)| if matching(e) { Some(id.clone()) } else { None })
99        {
100            self.index.remove(&id.nonce);
101            self.events.remove(&id).map(|e| (id.clone(), e))
102        } else {
103            None
104        }
105    }
106
107    /// Remove all events of the matching status with an expiry less than the given expiry.
108    /// This operation is in the number of events matching the expiry condition.
109    pub fn purge<F: Fn(&T) -> bool>(&mut self, expiry: u64, matching: F)
110    where
111        I: Default,
112    {
113        let key = EventId {
114            expiry,
115            nonce: I::default(),
116        };
117        let mut newer_events = self.events.split_off(&key);
118        self.events.retain(|k, v| {
119            let to_remove = matching(v);
120            if to_remove {
121                self.index.remove(&k.nonce);
122            }
123            !to_remove
124        });
125        self.events.append(&mut newer_events);
126    }
127
128    /// Remove all events with an expiry less than the given expiry.
129    /// This operation is `O(log(n))+O(m)`, where `m` is the number of events removed.
130    pub fn purge_expired(&mut self, expiry: u64)
131    where
132        I: Default,
133    {
134        let key = EventId {
135            expiry,
136            nonce: I::default(),
137        };
138        let to_keep = self.events.split_off(&key);
139        for to_remove in self.events.keys() {
140            self.index.remove(&to_remove.nonce);
141        }
142        self.events = to_keep;
143    }
144
145    /// Lookup event status given an event id.
146    /// This operation is `O(log(n))`.
147    pub fn get(&self, id: &EventId<I>) -> Option<&T> {
148        self.events.get(id)
149    }
150
151    /// Lookup event status given an event id.
152    /// This operation is `O(log(n))`.
153    pub fn remove(&mut self, id: &EventId<I>) -> Option<T> {
154        if let Some(value) = self.events.remove(id) {
155            self.index.remove(&id.nonce);
156            Some(value)
157        } else {
158            None
159        }
160    }
161
162    /// Modify the status of an event given its id.
163    /// This operation is `O(log(n))`.
164    pub fn modify<F: FnOnce(&mut T)>(&mut self, id: EventId<I>, f: F) {
165        self.events.entry(id).and_modify(|v: &mut T| f(v));
166    }
167
168    /// Return both an event id and its status if the given nonce is found.
169    /// This operation is `O(log(n))`.
170    pub fn find(&self, nonce: &I) -> Option<(&EventId<I>, &T)> {
171        self.index
172            .get(nonce)
173            .and_then(|id| self.events.get(id).map(|e| (id, e)))
174    }
175
176    /// Return max capacity.
177    pub fn capacity(&self) -> usize {
178        self.capacity
179    }
180
181    /// Return number of events in the queue.
182    /// This operation is constant time.
183    pub fn len(&self) -> usize {
184        self.events.len()
185    }
186
187    /// Return true if the queue is at its max capacity.
188    /// This operation is constant time.
189    pub fn is_full(&self) -> bool {
190        self.events.len() == self.capacity
191    }
192
193    /// Return true if the queue is empty.
194    /// This operation is constant time.
195    pub fn is_empty(&self) -> bool {
196        self.events.is_empty()
197    }
198
199    /// Return an iterator of all events in the order of their `EventId`, i.e., ordered by expiry first, then by nonce.
200    pub fn iter(&self) -> Box<dyn Iterator<Item = (&EventId<I>, &T)> + '_> {
201        Box::new(self.events.iter())
202    }
203
204    /// Check consistency of the event queue.
205    /// It should always return true if the implementation is correct.
206    #[cfg(test)]
207    fn selfcheck(&self) -> bool
208    where
209        I: Eq + Clone,
210    {
211        use std::collections::BTreeSet;
212        self.len() <= self.capacity
213            && self.events.len() == self.index.len()
214            && self.events.keys().collect::<Vec<_>>() == self.index.values().collect::<Vec<_>>()
215            && self.index.keys().cloned().collect::<BTreeSet<_>>().len() == self.index.len()
216    }
217}
218
219#[cfg(test)]
220mod test {
221    use super::*;
222    use assert_matches::*;
223
224    #[derive(Clone, Debug, PartialEq, Eq)]
225    enum Item {
226        Begin(u8),
227        Middle(u8),
228        End(u8),
229    }
230    use Item::*;
231
232    fn is_begin(item: &Item) -> bool {
233        matches!(item, Begin(_))
234    }
235
236    fn is_middle(item: &Item) -> bool {
237        matches!(item, Middle(_))
238    }
239
240    fn is_end(item: &Item) -> bool {
241        matches!(item, End(_))
242    }
243
244    #[test]
245    fn basic() {
246        let mut q: EventQueue<u8, Item> = EventQueue::new(10);
247        assert!(q.is_empty());
248        assert_eq!(q.len(), 0);
249        assert!(q.selfcheck());
250
251        for i in 0..10 {
252            let expiry = if i < 5 { 0 } else { 1 };
253            assert_matches!(q.insert(EventId::new(expiry, i), Begin(i)), Ok(_));
254            assert!(q.selfcheck());
255        }
256        // cannot insert any more
257        assert_matches!(
258            q.insert(EventId::new(2, 0), Begin(0)),
259            Err(EventQueueFullError)
260        );
261        assert!(q.selfcheck());
262        // still allow insertion of existing key
263        assert_matches!(q.insert(EventId::new(0, 1), Middle(10)), Ok(_));
264        assert_matches!(q.insert(EventId::new(0, 4), End(40)), Ok(_));
265        assert!(q.selfcheck());
266
267        // confirm if everything is in there
268        for i in 0..10 {
269            assert_matches!(q.find(&i), Some(_));
270        }
271
272        // test pop_front
273        assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 0), Begin(0))));
274        assert!(q.selfcheck());
275        assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 2), Begin(2))));
276        assert!(q.selfcheck());
277        assert_eq!(
278            q.pop_front(is_middle),
279            Some((EventId::new(0, 1), Middle(10)))
280        );
281        assert!(q.selfcheck());
282        assert_eq!(q.len(), 7);
283
284        let mut p = q.clone();
285
286        // test purge
287        q.purge(1, is_begin);
288        assert_eq!(q.len(), 6);
289        assert!(q.selfcheck());
290        assert_eq!(q.pop_front(is_begin), Some((EventId::new(1, 5), Begin(5))));
291        assert!(q.selfcheck());
292        assert_eq!(q.pop_front(is_end), Some((EventId::new(0, 4), End(40))));
293        assert!(q.selfcheck());
294        assert_eq!(q.len(), 4);
295
296        q.purge(2, is_begin);
297        assert_eq!(q.len(), 0);
298        assert!(q.selfcheck());
299
300        // test purge_expired
301        println!("{:?}", p.iter().collect::<Vec<_>>());
302        p.purge_expired(1);
303        assert_eq!(p.len(), 5);
304        assert!(p.selfcheck());
305
306        p.purge_expired(2);
307        assert_eq!(p.len(), 0);
308        assert!(p.selfcheck());
309    }
310}