ttl_queue/
lib.rs

1//! # Timed Queue
2//!
3//! A queue that drops its content after a given amount of time.
4//!
5//! ## Crate Features
6//!
7//! * `vecdeque` - Uses a `VecDeque` as the underlying data structure. Enabled by default.
8//! * `doublestack` - Uses two stacks (`Vec`) as the underlying data structure. Mutually exclusive with `vecdeque`.
9//! * `tokio` - Uses [`tokio::time::Instant`] instead of [`std::time::Instant`].
10//!
11//! ## Example
12//!
13//! To implement an FPS counter, you could use the following technique:
14//!
15//! ```
16//! # use std::thread;
17//! # use std::time::Duration;
18//! # use ttl_queue::TtlQueue;
19//! let mut fps_counter = TtlQueue::new(Duration::from_secs_f64(1.0));
20//!
21//! for i in 0..=50 {
22//!     // Register a new frame and return the number of frames observed
23//!     // within the last second.
24//!     let fps = fps_counter.refresh_and_push_back(());
25//!     debug_assert!(fps >= 1);
26//!
27//!     // Sleep ~20 ms to achieve a ~50 Hz frequency.
28//!     thread::sleep(Duration::from_millis(19));
29//! }
30//!
31//! let fps = fps_counter.refresh();
32//! debug_assert!(fps >= 45 && fps <= 55);
33//!
34//! let delta = fps_counter.avg_delta();
35//! debug_assert!(delta >= Duration::from_millis(19) && delta <= Duration::from_millis(21));
36//! ```
37
38use std::time::Duration;
39
40#[cfg(not(feature = "tokio"))]
41use std::time::Instant;
42
43#[cfg(feature = "tokio")]
44use tokio::time::Instant;
45
46#[cfg(feature = "vecdeque")]
47use std::collections::VecDeque;
48
49/// A queue that drops its content after a given amount of time.
50///
51/// ## Example
52///
53/// To implement an FPS counter, you could use the following technique:
54///
55/// ```
56/// # use std::thread;
57/// # use std::time::Duration;
58/// # use ttl_queue::TtlQueue;
59/// let mut fps_counter = TtlQueue::new(Duration::from_secs_f64(1.0));
60///
61/// for i in 0..=50 {
62///     // Register a new frame and return the number of frames observed
63///     // within the last second.
64///     let fps = fps_counter.refresh_and_push_back(());
65///     debug_assert!(fps >= 1);
66///
67///     // Sleep ~20 ms to achieve a ~50 Hz frequency.
68///     thread::sleep(Duration::from_millis(19));
69/// }
70///
71/// let fps = fps_counter.refresh();
72/// debug_assert!(fps >= 45 && fps <= 55);
73///
74/// let delta = fps_counter.avg_delta();
75/// debug_assert!(delta >= Duration::from_millis(19) && delta <= Duration::from_millis(21));
76/// ```
77#[derive(Debug)]
78pub struct TtlQueue<T> {
79    ttl: Duration,
80    #[cfg(feature = "doublestack")]
81    stack_1: Vec<(Instant, T)>,
82    #[cfg(feature = "doublestack")]
83    stack_2: Vec<(Instant, T)>,
84    #[cfg(feature = "vecdeque")]
85    queue: VecDeque<(Instant, T)>,
86}
87
88impl<T> TtlQueue<T> {
89    /// Creates an empty [`TtlQueue`] with default capacity.
90    pub fn new(ttl: Duration) -> Self {
91        Self {
92            ttl,
93            #[cfg(feature = "doublestack")]
94            stack_1: Vec::new(),
95            #[cfg(feature = "doublestack")]
96            stack_2: Vec::new(),
97            #[cfg(feature = "vecdeque")]
98            queue: VecDeque::new(),
99        }
100    }
101
102    /// Creates an empty [`TtlQueue`] for at least `capacity` elements.
103    pub fn with_capacity(ttl: Duration, capacity: usize) -> Self {
104        Self {
105            ttl,
106            #[cfg(feature = "doublestack")]
107            stack_1: Vec::with_capacity(capacity),
108            #[cfg(feature = "doublestack")]
109            stack_2: Vec::with_capacity(capacity),
110            #[cfg(feature = "vecdeque")]
111            queue: VecDeque::with_capacity(capacity),
112        }
113    }
114
115    /// Pushes an element to the end of the queue.
116    pub fn push_back(&mut self, element: T) {
117        self.push_back_entry(Instant::now(), element)
118    }
119
120    /// Pushes an element to the end of the queue.
121    fn push_back_entry(&mut self, instant: Instant, element: T) {
122        let entry = (instant, element);
123        #[cfg(feature = "doublestack")]
124        {
125            self.stack_1.push(entry);
126        }
127        #[cfg(feature = "vecdeque")]
128        {
129            self.queue.push_back(entry)
130        }
131    }
132
133    /// Pushes an element to the end of the queue and returns the number of items
134    /// currently in the queue. This operation is O(N) at worst.
135    pub fn refresh_and_push_back(&mut self, element: T) -> usize {
136        let count = self.refresh();
137        self.push_back(element);
138        count + 1
139    }
140
141    /// Gets the element from the front of the queue if it exists, as well as the
142    /// time instant at which it was added.
143    pub fn pop_front(&mut self) -> Option<(Instant, T)> {
144        #[cfg(feature = "doublestack")]
145        {
146            self.ensure_stack_full(false);
147            self.stack_2.pop()
148        }
149        #[cfg(feature = "vecdeque")]
150        {
151            self.queue.pop_front()
152        }
153    }
154
155    /// Similar to [`pop_front`](Self::pop_front) but without removing the element.
156    pub fn peek_front(&mut self) -> Option<&(Instant, T)> {
157        #[cfg(feature = "doublestack")]
158        {
159            self.ensure_stack_full(false);
160            self.stack_2.first()
161        }
162        #[cfg(feature = "vecdeque")]
163        {
164            self.queue.front()
165        }
166    }
167
168    #[cfg(feature = "doublestack")]
169    fn ensure_stack_full(&mut self, force: bool) {
170        if self.stack_2.is_empty() || force {
171            while let Some(item) = self.stack_1.pop() {
172                self.stack_2.push(item);
173            }
174        }
175    }
176
177    /// Gets the number elements currently in the queue, including potentially expired elements.
178    ///
179    /// This operation is O(1). In order to obtain an accurate count in O(N) (worst-case),
180    /// use [`refresh`](Self::refresh) instead.
181    pub fn len(&self) -> usize {
182        #[cfg(feature = "doublestack")]
183        {
184            self.stack_1.len() + self.stack_2.len()
185        }
186        #[cfg(feature = "vecdeque")]
187        {
188            self.queue.len()
189        }
190    }
191
192    /// Returns `true` if the queue is definitely empty or `false` if the queue is
193    /// possibly empty.
194    ///
195    /// This operation is O(1). In order to obtain an accurate count in O(N) (worst-case),
196    /// use [`refresh`](Self::refresh) instead.
197    pub fn is_empty(&self) -> bool {
198        #[cfg(feature = "doublestack")]
199        {
200            self.stack_1.is_empty() && self.stack_2.is_empty()
201        }
202        #[cfg(feature = "vecdeque")]
203        {
204            self.queue.is_empty()
205        }
206    }
207
208    /// Refreshes the queue and returns the number of currently contained elements.
209    #[cfg(feature = "doublestack")]
210    pub fn refresh(&mut self) -> usize {
211        let now = Instant::now();
212
213        while let Some((instant, _element)) = self.stack_2.first() {
214            if (now - *instant) < self.ttl {
215                break;
216            }
217
218            let _result = self.stack_2.pop();
219            debug_assert!(_result.is_some());
220        }
221
222        if !self.stack_2.is_empty() {
223            return self.len();
224        }
225
226        while let Some((instant, _element)) = self.stack_1.first() {
227            if (now - *instant) < self.ttl {
228                break;
229            }
230
231            let _result = self.stack_1.pop();
232            debug_assert!(_result.is_some());
233        }
234
235        debug_assert_eq!(self.stack_1.len(), self.len());
236        self.stack_1.len()
237    }
238
239    /// Refreshes the queue and returns the number of currently contained elements.
240    #[cfg(feature = "vecdeque")]
241    pub fn refresh(&mut self) -> usize {
242        let now = Instant::now();
243
244        while let Some((instant, _element)) = self.queue.front() {
245            if (now - *instant) < self.ttl {
246                break;
247            }
248
249            let _result = self.queue.pop_front();
250            debug_assert!(_result.is_some());
251        }
252
253        self.queue.len()
254    }
255
256    /// Returns an iterator to the data.
257    pub fn iter(&self) -> impl Iterator<Item = &(Instant, T)> {
258        #[cfg(feature = "doublestack")]
259        {
260            return DoubleStackIterator::new(&self);
261        }
262        #[cfg(feature = "vecdeque")]
263        {
264            self.queue.iter()
265        }
266    }
267
268    /// Returns the average duration between two events.
269    pub fn avg_delta(&self) -> Duration {
270        if self.len() <= 1 {
271            return Duration::ZERO;
272        }
273
274        let (count, sum) = self
275            .iter()
276            .zip(self.iter().skip(1))
277            .fold((0, Duration::ZERO), |(count, sum), (lhs, rhs)| {
278                (count + 1, sum + (rhs.0 - lhs.0))
279            });
280
281        debug_assert_ne!(count, 0);
282        sum / count
283    }
284}
285
286impl<T> IntoIterator for TtlQueue<T> {
287    type Item = (Instant, T);
288
289    #[cfg(feature = "vecdeque")]
290    type IntoIter = std::collections::vec_deque::IntoIter<Self::Item>;
291
292    #[cfg(feature = "doublestack")]
293    type IntoIter = std::iter::Chain<
294        std::iter::Rev<std::vec::IntoIter<Self::Item>>,
295        std::vec::IntoIter<Self::Item>,
296    >;
297
298    fn into_iter(self) -> Self::IntoIter {
299        #[cfg(feature = "vecdeque")]
300        {
301            self.queue.into_iter()
302        }
303        #[cfg(feature = "doublestack")]
304        {
305            self.stack_2
306                .into_iter()
307                .rev()
308                .chain(self.stack_1.into_iter())
309        }
310    }
311}
312
313#[cfg(feature = "doublestack")]
314pub struct DoubleStackIterator<'a, T> {
315    queue: &'a TtlQueue<T>,
316    stage: DoubleStackIteratorStage<'a, T>,
317}
318
319#[cfg(feature = "doublestack")]
320enum DoubleStackIteratorStage<'a, T> {
321    First(std::iter::Rev<std::slice::Iter<'a, (Instant, T)>>),
322    Second(std::slice::Iter<'a, (Instant, T)>),
323    Done,
324}
325
326#[cfg(feature = "doublestack")]
327impl<'a, T> Iterator for DoubleStackIteratorStage<'a, T> {
328    type Item = &'a (Instant, T);
329
330    fn next(&mut self) -> Option<Self::Item> {
331        match self {
332            DoubleStackIteratorStage::First(iter) => iter.next(),
333            DoubleStackIteratorStage::Second(iter) => iter.next(),
334            DoubleStackIteratorStage::Done => None,
335        }
336    }
337}
338
339#[cfg(feature = "doublestack")]
340impl<'a, T> DoubleStackIterator<'a, T> {
341    pub fn new(queue: &'a TtlQueue<T>) -> Self {
342        Self {
343            queue,
344            stage: DoubleStackIteratorStage::First(queue.stack_2.iter().rev()),
345        }
346    }
347}
348
349#[cfg(feature = "doublestack")]
350impl<'a, T> Iterator for DoubleStackIterator<'a, T> {
351    type Item = &'a (Instant, T);
352
353    fn next(&mut self) -> Option<Self::Item> {
354        loop {
355            if let Some(element) = self.stage.next() {
356                return Some(element);
357            }
358
359            if matches!(self.stage, DoubleStackIteratorStage::First(..)) {
360                self.stage = DoubleStackIteratorStage::Second(self.queue.stack_1.iter());
361                continue;
362            }
363
364            debug_assert!(matches!(self.stage, DoubleStackIteratorStage::Second(..)));
365
366            self.stage = DoubleStackIteratorStage::Done;
367            return None;
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use std::thread;
376
377    #[test]
378    fn it_works() {
379        let mut queue = TtlQueue::new(Duration::from_millis(50));
380        queue.push_back(10);
381        queue.push_back(20);
382        queue.push_back(30);
383        assert_eq!(queue.refresh(), 3);
384
385        let value = queue.pop_front().unwrap();
386        assert_eq!(value.1, 10);
387
388        assert_eq!(queue.refresh(), 2);
389
390        thread::sleep(Duration::from_millis(50));
391        assert_eq!(queue.refresh(), 0);
392    }
393
394    #[test]
395    fn iter_works() {
396        let mut queue = TtlQueue::new(Duration::MAX);
397        for i in 0..1000 {
398            queue.push_back((i * 10) as usize);
399
400            // Ensure data is both in stack 1 and stack 2
401            #[cfg(feature = "doublestack")]
402            {
403                if i == 500 {
404                    queue.ensure_stack_full(true);
405                }
406            }
407        }
408
409        for (i, (_instant, value)) in queue.iter().enumerate() {
410            assert_eq!(*value, i * 10);
411        }
412    }
413
414    #[test]
415    fn into_iter_works() {
416        let mut queue = TtlQueue::new(Duration::MAX);
417        for i in 0..100 {
418            queue.push_back((i * 10) as usize);
419
420            // Ensure data is both in stack 1 and stack 2
421            #[cfg(feature = "doublestack")]
422            {
423                if i == 50 {
424                    queue.ensure_stack_full(true);
425                }
426            }
427        }
428
429        for (i, (_instant, value)) in queue.into_iter().enumerate() {
430            assert_eq!(value, i * 10);
431        }
432    }
433
434    #[test]
435    fn avg_duration_works() {
436        let mut queue = TtlQueue::new(Duration::MAX);
437        let now = Instant::now();
438
439        for i in 0..10 {
440            queue.push_back_entry(now + Duration::from_secs(i), ());
441        }
442
443        let avg = queue.avg_delta();
444        assert_eq!(avg, Duration::from_secs(1));
445    }
446
447    #[test]
448    fn avg_duration_with_zero_inputs_works() {
449        let queue = TtlQueue::<()>::new(Duration::MAX);
450
451        let avg = queue.avg_delta();
452        assert_eq!(avg, Duration::ZERO);
453    }
454
455    #[test]
456    fn avg_duration_with_one_inputs_works() {
457        let mut queue = TtlQueue::new(Duration::MAX);
458        queue.push_back(());
459
460        let avg = queue.avg_delta();
461        assert_eq!(avg, Duration::ZERO);
462    }
463
464    #[test]
465    fn fps_counter() {
466        let mut fps_counter = TtlQueue::new(Duration::from_secs(1));
467
468        for _i in 0..50 {
469            // Register a new frame and return the number of frames observed
470            // within the last second.
471            let fps = fps_counter.refresh_and_push_back(());
472            debug_assert!(fps >= 1);
473
474            // Sleep ~20 ms to achieve a ~50 Hz frequency.
475            thread::sleep(Duration::from_millis(19));
476        }
477
478        let fps = fps_counter.refresh();
479        debug_assert!(fps >= 45 && fps <= 55);
480
481        let delta = fps_counter.avg_delta();
482        debug_assert!(delta >= Duration::from_millis(19) && delta <= Duration::from_millis(21));
483    }
484}