rate_limit_queue/lib.rs
1#![recursion_limit = "128"]
2
3#[macro_use]
4extern crate delegate;
5
6use std::{
7 collections::VecDeque,
8 iter::Extend,
9 thread,
10 time::{Duration, Instant},
11};
12
13#[cfg(test)]
14mod tests;
15
16/// A rate limited queue.
17pub struct RateLimitQueue<T> {
18 quantum: usize,
19 interval: Duration,
20 queue: VecDeque<T>,
21 allowance: usize,
22 timepoint: Instant,
23}
24
25/// A type that represents result of `try_dequeue()`.
26#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash)]
27pub enum DequeueResult<T> {
28 Data(T),
29 Empty,
30 Limit(Duration),
31}
32
33impl<T> DequeueResult<T> {
34 pub fn is_data(&self) -> bool {
35 match self {
36 DequeueResult::Data(_) => true,
37 _ => false,
38 }
39 }
40
41 pub fn is_empty(&self) -> bool {
42 match self {
43 DequeueResult::Empty => true,
44 _ => false,
45 }
46 }
47
48 pub fn is_limit(&self) -> bool {
49 match self {
50 DequeueResult::Limit(_) => true,
51 _ => false,
52 }
53 }
54}
55
56impl<T> From<Option<T>> for DequeueResult<T> {
57 fn from(opt: Option<T>) -> DequeueResult<T> {
58 opt.map_or(DequeueResult::Empty, DequeueResult::Data)
59 }
60}
61
62impl<T> Into<Option<T>> for DequeueResult<T> {
63 fn into(self) -> Option<T> {
64 match self {
65 DequeueResult::Data(value) => Some(value),
66 DequeueResult::Empty | DequeueResult::Limit(_) => None,
67 }
68 }
69}
70
71impl<T> RateLimitQueue<T> {
72 /// Creates an empty queue.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// # use std::time::Duration;
78 /// use rate_limit_queue::RateLimitQueue;
79 ///
80 /// let queue: RateLimitQueue<i32> = RateLimitQueue::new(100, Duration::from_secs(1));
81 /// ```
82 #[inline]
83 pub fn new(quantum: usize, interval: Duration) -> RateLimitQueue<T> {
84 RateLimitQueue::with_capacity(0, quantum, interval)
85 }
86
87 /// Creates an empty queue with space for at least `n` elements.
88 ///
89 /// # Examples
90 ///
91 /// ```
92 /// # use std::time::Duration;
93 /// use rate_limit_queue::RateLimitQueue;
94 ///
95 /// let queue: RateLimitQueue<u32> = RateLimitQueue::with_capacity(10, 100, Duration::from_secs(1));
96 /// ```
97 #[inline]
98 pub fn with_capacity(cap: usize, quantum: usize, interval: Duration) -> RateLimitQueue<T> {
99 RateLimitQueue {
100 quantum,
101 interval,
102 queue: VecDeque::with_capacity(cap),
103 allowance: quantum,
104 timepoint: Instant::now(),
105 }
106 }
107
108 delegate! {
109 target self.queue {
110 /// Returns the number of elements the queue can hold without reallocating.
111 pub fn capacity(&mut self) -> usize;
112 /// Reserves the minimum capacity for exactly `additional` more elements.
113 ///
114 /// # Panics
115 ///
116 /// Panics if the new capacity overflows `usize`.
117 pub fn reserve_exact(&mut self, additional: usize);
118 /// Reserves capacity for at least `additional` more elements.
119 ///
120 /// # Panics
121 ///
122 /// Panics if the new capacity overflows `usize`.
123 pub fn reserve(&mut self, additional: usize);
124 /// Shrinks the capacity of the queue as much as possible.
125 pub fn shrink_to_fit(&mut self);
126 /// Shortens the queue, dropping excess elements from the back.
127 pub fn truncate(&mut self, len: usize);
128 /// Returns the number of elements in the queue.
129 pub fn len(&self) -> usize;
130 /// Returns `true` if the queue is empty.
131 pub fn is_empty(&self) -> bool;
132 }
133 }
134
135 /// Changes the quantum.
136 pub fn set_quantum(&mut self, quantum: usize) {
137 self.quantum = quantum;
138 }
139
140 /// Changes the interval.
141 pub fn set_interval(&mut self, interval: Duration) {
142 self.interval = interval;
143 }
144
145 /// Appends an element to the back of the queue.
146 ///
147 /// # Examples
148 ///
149 /// ```
150 /// # use std::time::Duration;
151 /// use rate_limit_queue::RateLimitQueue;
152 ///
153 /// let mut queue = RateLimitQueue::new(100, Duration::from_secs(1));
154 /// queue.enqueue(1);
155 /// queue.enqueue(2);
156 /// ```
157 pub fn enqueue(&mut self, value: T) {
158 self.queue.push_back(value);
159 }
160
161 /// Removes the first element and returns it, or `None` if the queue is empty.
162 ///
163 /// Sleeps if the limit has been reached.
164 ///
165 /// # Examples
166 ///
167 /// ```
168 /// # use std::time::Duration;
169 /// use rate_limit_queue::RateLimitQueue;
170 ///
171 /// let mut queue = RateLimitQueue::new(100, Duration::from_secs(1));
172 /// queue.enqueue(1);
173 /// queue.enqueue(2);
174 ///
175 /// assert_eq!(queue.dequeue(), Some(1));
176 /// assert_eq!(queue.dequeue(), Some(2));
177 /// ```
178 pub fn dequeue(&mut self) -> Option<T> {
179 match self.try_dequeue() {
180 DequeueResult::Data(value) => Some(value),
181 DequeueResult::Empty => None,
182 DequeueResult::Limit(rest) => {
183 thread::sleep(rest);
184
185 if let DequeueResult::Data(value) = self.try_dequeue() {
186 Some(value)
187 } else {
188 unreachable!()
189 }
190 }
191 }
192 }
193
194 /// Tries to remove the first element and return it.
195 ///
196 /// # Examples
197 ///
198 /// ```
199 /// # use std::time::Duration;
200 /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
201 ///
202 /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
203 /// queue.enqueue(1);
204 /// queue.enqueue(2);
205 ///
206 /// assert_eq!(queue.try_dequeue(), DequeueResult::Data(1));
207 /// assert_eq!(queue.try_dequeue(), DequeueResult::Data(2));
208 /// assert_eq!(queue.try_dequeue(), DequeueResult::Empty);
209 ///
210 /// queue.enqueue(3);
211 /// assert!(queue.try_dequeue().is_limit());
212 /// ```
213 pub fn try_dequeue(&mut self) -> DequeueResult<T> {
214 if self.queue.is_empty() {
215 return DequeueResult::Empty;
216 }
217
218 if self.allowance > 0 {
219 self.allowance -= 1;
220 return self.queue.pop_front().into();
221 }
222
223 let now = Instant::now();
224 let elapsed = now.duration_since(self.timepoint);
225
226 match self.interval.checked_sub(elapsed) {
227 Some(rest) => DequeueResult::Limit(rest),
228 None => {
229 self.allowance = self.quantum - 1;
230 self.timepoint = now;
231 self.queue.pop_front().into()
232 }
233 }
234 }
235
236 /// Returns a front-to-back iterator.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// # use std::time::Duration;
242 /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
243 ///
244 /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
245 /// queue.enqueue(1);
246 /// queue.enqueue(2);
247 ///
248 /// let b: &[_] = &[&1, &2];
249 /// let c: Vec<&i32> = queue.iter().collect();
250 /// assert_eq!(&c[..], b);
251 /// ```
252 pub fn iter(&mut self) -> impl Iterator<Item = &T> {
253 let allowance = &mut self.allowance;
254
255 self.queue
256 .iter()
257 .take(*allowance)
258 .inspect(move |_| *allowance -= 1)
259 }
260
261 /// Returns a front-to-back iterator that returns mutable references.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// # use std::time::Duration;
267 /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
268 ///
269 /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
270 /// queue.enqueue(1);
271 /// queue.enqueue(2);
272 ///
273 /// let b: &[_] = &[&mut 1, &mut 2];
274 /// let c: Vec<&mut i32> = queue.iter_mut().collect();
275 /// assert_eq!(&c[..], b);
276 /// ```
277 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
278 let allowance = &mut self.allowance;
279
280 self.queue
281 .iter_mut()
282 .take(*allowance)
283 .inspect(move |_| *allowance -= 1)
284 }
285}
286
287impl<T> Extend<T> for RateLimitQueue<T> {
288 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
289 self.queue.extend(iter)
290 }
291}