delay_handler/lib.rs
1use std::collections::{hash_map::Entry, HashMap};
2use std::fmt::Display;
3use std::hash::Hash;
4use std::time::Duration;
5
6use tokio_stream::StreamExt;
7use tokio_util::time::{delay_queue::Key, DelayQueue};
8
9/// An abstration over [`DelayQueue`] that allows you to create a delay, with associated data.
10///
11/// Users can add data to the delay-map with [`insert()`](DelayHandler::insert). The associated data
12/// is removed and returned when delay is timedout by `.await`ing on [`next()`](DelayHandler::next).
13/// Users can also prematurely remove the delay from the delay-map with [`remove()`](DelayHandler::remove).
14///
15/// ### Examples
16/// 1. Insert 3 numbers into delay-map with 10s delays, print them as they timeout
17/// ```no_run
18/// # use delay_handler::DelayHandler;
19/// # use std::time::Duration;
20/// # async fn run() {
21/// let mut handler = DelayHandler::default();
22/// // Adds 1, 2, 3 to the delay-map, each with 10s delay
23/// handler.insert(1, Duration::from_secs(10));
24/// handler.insert(2, Duration::from_secs(10));
25/// handler.insert(3, Duration::from_secs(10));
26///
27/// // Expect a delay of ~10s, after which 1, 2, 3 should print to stdout, in quick succession.
28/// while let Some(expired) = handler.next().await {
29/// println!("{}", expired);
30/// }
31/// # }
32/// ```
33/// 2. Insert 3 numbers into delay-map with different delays, print them as they timeout
34/// ```no_run
35/// # use delay_handler::DelayHandler;
36/// # use std::time::Duration;
37/// # async fn run() {
38/// let mut handler = DelayHandler::default();
39/// // Adds 1, 2 to the delay-map, with different delays
40/// handler.insert(1, Duration::from_secs(10));
41/// handler.insert(2, Duration::from_secs(5));
42///
43/// // With a delay of ~5s between, the prints should come in the order of 2 and 1.
44/// while let Some(expired) = handler.next().await {
45/// println!("{}", expired);
46/// }
47/// # }
48/// ```
49///
50/// 3. Insert 3 numbers into delay-map with different delays, remove print as delays are timedout
51/// ```no_run
52/// # use delay_handler::DelayHandler;
53/// # use std::time::Duration;
54/// # async fn run() {
55/// let mut handler = DelayHandler::default();
56/// // Adds 1, 2, 3 to the delay-map, each with different delays
57/// handler.insert(1, Duration::from_secs(15));
58/// handler.insert(2, Duration::from_secs(5));
59/// handler.insert(3, Duration::from_secs(10));
60///
61/// // Remove 3 from the delay-map
62/// handler.remove(&3);
63///
64/// // Prints should be in the order of first 2 and ~10s later 1.
65/// while let Some(expired) = handler.next().await {
66/// println!("{}", expired);
67/// }
68/// # }
69/// ```
70pub struct DelayHandler<T> {
71 queue: DelayQueue<T>,
72 map: HashMap<T, Key>,
73}
74
75impl<T> DelayHandler<T>
76where
77 T: Eq + Hash + Clone + Display,
78{
79 /// Insert new timeout into the map and queue if it doesn't already exist.
80 /// If one already exists, don't .
81 pub fn insert(&mut self, item: T, period: Duration) -> bool {
82 match self.map.entry(item.clone()) {
83 Entry::Vacant(v) => {
84 let key = self.queue.insert(item, period);
85 v.insert(key);
86
87 true
88 }
89 _ => false,
90 }
91 }
92
93 /// Prematurely removes timeout from delay-map, if it didn't already exist returns false.
94 pub fn remove(&mut self, item: &T) -> bool {
95 match self.map.remove(item) {
96 Some(key) => {
97 self.queue.remove(&key).into_inner();
98
99 true
100 }
101 _ => false,
102 }
103 }
104
105 /// Remove a key from map if it has timedout and return the name.
106 pub async fn next(&mut self) -> Option<T> {
107 let item = self.queue.next().await?.into_inner();
108 self.map.remove(&item);
109
110 Some(item)
111 }
112
113 /// Check if queue is empty. Could be used as precondition in an async select operation.
114 /// NOTE: The following example assumes usage of `tokio::select`
115 ///
116 /// ```text
117 /// select! {
118 /// ...
119 /// Some(expired) = handler.next(), if !handler.is_empty() => println!("{}", expired)
120 /// ...
121 /// }
122 /// ```
123 pub fn is_empty(&self) -> bool {
124 self.queue.is_empty()
125 }
126}
127
128impl<T> Default for DelayHandler<T>
129where
130 T: Eq + Hash + Clone + Display,
131{
132 fn default() -> Self {
133 Self {
134 queue: DelayQueue::new(),
135 map: HashMap::new(),
136 }
137 }
138}