rate_limit_queue/
lib.rs

1#![recursion_limit = "128"]
2
3#[macro_use]
4extern crate delegate;
5
6use std::{
7    collections::VecDeque,
8    iter::Extend,
9    thread,
10    time::{Duration, Instant},
11};
12
13#[cfg(test)]
14mod tests;
15
16/// A rate limited queue.
17pub struct RateLimitQueue<T> {
18    quantum: usize,
19    interval: Duration,
20    queue: VecDeque<T>,
21    allowance: usize,
22    timepoint: Instant,
23}
24
25/// A type that represents result of `try_dequeue()`.
26#[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Eq, Ord, Hash)]
27pub enum DequeueResult<T> {
28    Data(T),
29    Empty,
30    Limit(Duration),
31}
32
33impl<T> DequeueResult<T> {
34    pub fn is_data(&self) -> bool {
35        match self {
36            DequeueResult::Data(_) => true,
37            _ => false,
38        }
39    }
40
41    pub fn is_empty(&self) -> bool {
42        match self {
43            DequeueResult::Empty => true,
44            _ => false,
45        }
46    }
47
48    pub fn is_limit(&self) -> bool {
49        match self {
50            DequeueResult::Limit(_) => true,
51            _ => false,
52        }
53    }
54}
55
56impl<T> From<Option<T>> for DequeueResult<T> {
57    fn from(opt: Option<T>) -> DequeueResult<T> {
58        opt.map_or(DequeueResult::Empty, DequeueResult::Data)
59    }
60}
61
62impl<T> Into<Option<T>> for DequeueResult<T> {
63    fn into(self) -> Option<T> {
64        match self {
65            DequeueResult::Data(value) => Some(value),
66            DequeueResult::Empty | DequeueResult::Limit(_) => None,
67        }
68    }
69}
70
71impl<T> RateLimitQueue<T> {
72    /// Creates an empty queue.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// # use std::time::Duration;
78    /// use rate_limit_queue::RateLimitQueue;
79    ///
80    /// let queue: RateLimitQueue<i32> = RateLimitQueue::new(100, Duration::from_secs(1));
81    /// ```
82    #[inline]
83    pub fn new(quantum: usize, interval: Duration) -> RateLimitQueue<T> {
84        RateLimitQueue::with_capacity(0, quantum, interval)
85    }
86
87    /// Creates an empty queue with space for at least `n` elements.
88    ///
89    /// # Examples
90    ///
91    /// ```
92    /// # use std::time::Duration;
93    /// use rate_limit_queue::RateLimitQueue;
94    ///
95    /// let queue: RateLimitQueue<u32> = RateLimitQueue::with_capacity(10, 100, Duration::from_secs(1));
96    /// ```
97    #[inline]
98    pub fn with_capacity(cap: usize, quantum: usize, interval: Duration) -> RateLimitQueue<T> {
99        RateLimitQueue {
100            quantum,
101            interval,
102            queue: VecDeque::with_capacity(cap),
103            allowance: quantum,
104            timepoint: Instant::now(),
105        }
106    }
107
108    delegate! {
109        target self.queue {
110            /// Returns the number of elements the queue can hold without reallocating.
111            pub fn capacity(&mut self) -> usize;
112            /// Reserves the minimum capacity for exactly `additional` more elements.
113            ///
114            /// # Panics
115            ///
116            /// Panics if the new capacity overflows `usize`.
117            pub fn reserve_exact(&mut self, additional: usize);
118            /// Reserves capacity for at least `additional` more elements.
119            ///
120            /// # Panics
121            ///
122            /// Panics if the new capacity overflows `usize`.
123            pub fn reserve(&mut self, additional: usize);
124            /// Shrinks the capacity of the queue as much as possible.
125            pub fn shrink_to_fit(&mut self);
126            /// Shortens the queue, dropping excess elements from the back.
127            pub fn truncate(&mut self, len: usize);
128            /// Returns the number of elements in the queue.
129            pub fn len(&self) -> usize;
130            /// Returns `true` if the queue is empty.
131            pub fn is_empty(&self) -> bool;
132        }
133    }
134
135    /// Changes the quantum.
136    pub fn set_quantum(&mut self, quantum: usize) {
137        self.quantum = quantum;
138    }
139
140    /// Changes the interval.
141    pub fn set_interval(&mut self, interval: Duration) {
142        self.interval = interval;
143    }
144
145    /// Appends an element to the back of the queue.
146    ///
147    /// # Examples
148    ///
149    /// ```
150    /// # use std::time::Duration;
151    /// use rate_limit_queue::RateLimitQueue;
152    ///
153    /// let mut queue = RateLimitQueue::new(100, Duration::from_secs(1));
154    /// queue.enqueue(1);
155    /// queue.enqueue(2);
156    /// ```
157    pub fn enqueue(&mut self, value: T) {
158        self.queue.push_back(value);
159    }
160
161    /// Removes the first element and returns it, or `None` if the queue is empty.
162    ///
163    /// Sleeps if the limit has been reached.
164    ///
165    /// # Examples
166    ///
167    /// ```
168    /// # use std::time::Duration;
169    /// use rate_limit_queue::RateLimitQueue;
170    ///
171    /// let mut queue = RateLimitQueue::new(100, Duration::from_secs(1));
172    /// queue.enqueue(1);
173    /// queue.enqueue(2);
174    ///
175    /// assert_eq!(queue.dequeue(), Some(1));
176    /// assert_eq!(queue.dequeue(), Some(2));
177    /// ```
178    pub fn dequeue(&mut self) -> Option<T> {
179        match self.try_dequeue() {
180            DequeueResult::Data(value) => Some(value),
181            DequeueResult::Empty => None,
182            DequeueResult::Limit(rest) => {
183                thread::sleep(rest);
184
185                if let DequeueResult::Data(value) = self.try_dequeue() {
186                    Some(value)
187                } else {
188                    unreachable!()
189                }
190            }
191        }
192    }
193
194    /// Tries to remove the first element and return it.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// # use std::time::Duration;
200    /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
201    ///
202    /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
203    /// queue.enqueue(1);
204    /// queue.enqueue(2);
205    ///
206    /// assert_eq!(queue.try_dequeue(), DequeueResult::Data(1));
207    /// assert_eq!(queue.try_dequeue(), DequeueResult::Data(2));
208    /// assert_eq!(queue.try_dequeue(), DequeueResult::Empty);
209    ///
210    /// queue.enqueue(3);
211    /// assert!(queue.try_dequeue().is_limit());
212    /// ```
213    pub fn try_dequeue(&mut self) -> DequeueResult<T> {
214        if self.queue.is_empty() {
215            return DequeueResult::Empty;
216        }
217
218        if self.allowance > 0 {
219            self.allowance -= 1;
220            return self.queue.pop_front().into();
221        }
222
223        let now = Instant::now();
224        let elapsed = now.duration_since(self.timepoint);
225
226        match self.interval.checked_sub(elapsed) {
227            Some(rest) => DequeueResult::Limit(rest),
228            None => {
229                self.allowance = self.quantum - 1;
230                self.timepoint = now;
231                self.queue.pop_front().into()
232            }
233        }
234    }
235
236    /// Returns a front-to-back iterator.
237    ///
238    /// # Examples
239    ///
240    /// ```
241    /// # use std::time::Duration;
242    /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
243    ///
244    /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
245    /// queue.enqueue(1);
246    /// queue.enqueue(2);
247    ///
248    /// let b: &[_] = &[&1, &2];
249    /// let c: Vec<&i32> = queue.iter().collect();
250    /// assert_eq!(&c[..], b);
251    /// ```
252    pub fn iter(&mut self) -> impl Iterator<Item = &T> {
253        let allowance = &mut self.allowance;
254
255        self.queue
256            .iter()
257            .take(*allowance)
258            .inspect(move |_| *allowance -= 1)
259    }
260
261    /// Returns a front-to-back iterator that returns mutable references.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// # use std::time::Duration;
267    /// use rate_limit_queue::{RateLimitQueue, DequeueResult};
268    ///
269    /// let mut queue = RateLimitQueue::new(2, Duration::from_secs(10));
270    /// queue.enqueue(1);
271    /// queue.enqueue(2);
272    ///
273    /// let b: &[_] = &[&mut 1, &mut 2];
274    /// let c: Vec<&mut i32> = queue.iter_mut().collect();
275    /// assert_eq!(&c[..], b);
276    /// ```
277    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> {
278        let allowance = &mut self.allowance;
279
280        self.queue
281            .iter_mut()
282            .take(*allowance)
283            .inspect(move |_| *allowance -= 1)
284    }
285}
286
287impl<T> Extend<T> for RateLimitQueue<T> {
288    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
289        self.queue.extend(iter)
290    }
291}