delay_handler/
lib.rs

1use std::collections::{hash_map::Entry, HashMap};
2use std::fmt::Display;
3use std::hash::Hash;
4use std::time::Duration;
5
6use tokio_stream::StreamExt;
7use tokio_util::time::{delay_queue::Key, DelayQueue};
8
9/// An abstration over [`DelayQueue`] that allows you to create a delay, with associated data.
10///
11/// Users can add data to the delay-map with [`insert()`](DelayHandler::insert). The associated data
12/// is removed and returned when delay is timedout by `.await`ing on [`next()`](DelayHandler::next).
13/// Users can also prematurely remove the delay from the delay-map with [`remove()`](DelayHandler::remove).
14///
15/// ### Examples
16/// 1. Insert 3 numbers into delay-map with 10s delays, print them as they timeout
17/// ```no_run
18/// # use delay_handler::DelayHandler;
19/// # use std::time::Duration;
20/// # async fn run() {
21/// let mut handler = DelayHandler::default();
22/// // Adds 1, 2, 3 to the delay-map, each with 10s delay
23/// handler.insert(1, Duration::from_secs(10));
24/// handler.insert(2, Duration::from_secs(10));
25/// handler.insert(3, Duration::from_secs(10));
26///
27/// // Expect a delay of ~10s, after which 1, 2, 3 should print to stdout, in quick succession.
28/// while let Some(expired) = handler.next().await {
29///     println!("{}", expired);
30/// }
31/// # }
32/// ```
33/// 2. Insert 3 numbers into delay-map with different delays, print them as they timeout
34/// ```no_run
35/// # use delay_handler::DelayHandler;
36/// # use std::time::Duration;
37/// # async fn run() {
38/// let mut handler = DelayHandler::default();
39/// // Adds 1, 2 to the delay-map, with different delays
40/// handler.insert(1, Duration::from_secs(10));
41/// handler.insert(2, Duration::from_secs(5));
42///
43/// // With a delay of ~5s between, the prints should come in the order of 2 and 1.
44/// while let Some(expired) = handler.next().await {
45///     println!("{}", expired);
46/// }
47/// # }
48/// ```
49///
50/// 3. Insert 3 numbers into delay-map with different delays, remove  print as delays are timedout
51/// ```no_run
52/// # use delay_handler::DelayHandler;
53/// # use std::time::Duration;
54/// # async fn run() {
55/// let mut handler = DelayHandler::default();
56/// // Adds 1, 2, 3 to the delay-map, each with different delays
57/// handler.insert(1, Duration::from_secs(15));
58/// handler.insert(2, Duration::from_secs(5));
59/// handler.insert(3, Duration::from_secs(10));
60/// 
61/// // Remove 3 from the delay-map
62/// handler.remove(&3);
63///
64/// // Prints should be in the order of first 2 and ~10s later 1.
65/// while let Some(expired) = handler.next().await {
66///     println!("{}", expired);
67/// }
68/// # }
69/// ```
70pub struct DelayHandler<T> {
71    queue: DelayQueue<T>,
72    map: HashMap<T, Key>,
73}
74
75impl<T> DelayHandler<T>
76where
77    T: Eq + Hash + Clone + Display,
78{
79    /// Insert new timeout into the map and queue if it doesn't already exist.
80    /// If one already exists, don't .
81    pub fn insert(&mut self, item: T, period: Duration) -> bool {
82        match self.map.entry(item.clone()) {
83            Entry::Vacant(v) => {
84                let key = self.queue.insert(item, period);
85                v.insert(key);
86
87                true
88            }
89            _ => false,
90        }
91    }
92
93    /// Prematurely removes timeout from delay-map, if it didn't already exist returns false.
94    pub fn remove(&mut self, item: &T) -> bool {
95        match self.map.remove(item) {
96            Some(key) => {
97                self.queue.remove(&key).into_inner();
98
99                true
100            }
101            _ => false,
102        }
103    }
104
105    /// Remove a key from map if it has timedout and return the name.
106    pub async fn next(&mut self) -> Option<T> {
107        let item = self.queue.next().await?.into_inner();
108        self.map.remove(&item);
109
110        Some(item)
111    }
112
113    /// Check if queue is empty. Could be used as precondition in an async select operation.
114    /// NOTE: The following example assumes usage of `tokio::select`
115    ///
116    /// ```text
117    /// select! {
118    ///     ...
119    ///     Some(expired) = handler.next(), if !handler.is_empty() => println!("{}", expired)
120    ///     ...
121    /// }
122    /// ```
123    pub fn is_empty(&self) -> bool {
124        self.queue.is_empty()
125    }
126}
127
128impl<T> Default for DelayHandler<T>
129where
130    T: Eq + Hash + Clone + Display,
131{
132    fn default() -> Self {
133        Self {
134            queue: DelayQueue::new(),
135            map: HashMap::new(),
136        }
137    }
138}