1use std::time::Duration;
39
40#[cfg(not(feature = "tokio"))]
41use std::time::Instant;
42
43#[cfg(feature = "tokio")]
44use tokio::time::Instant;
45
46#[cfg(feature = "vecdeque")]
47use std::collections::VecDeque;
48
49#[derive(Debug)]
78pub struct TtlQueue<T> {
79 ttl: Duration,
80 #[cfg(feature = "doublestack")]
81 stack_1: Vec<(Instant, T)>,
82 #[cfg(feature = "doublestack")]
83 stack_2: Vec<(Instant, T)>,
84 #[cfg(feature = "vecdeque")]
85 queue: VecDeque<(Instant, T)>,
86}
87
88impl<T> TtlQueue<T> {
89 pub fn new(ttl: Duration) -> Self {
91 Self {
92 ttl,
93 #[cfg(feature = "doublestack")]
94 stack_1: Vec::new(),
95 #[cfg(feature = "doublestack")]
96 stack_2: Vec::new(),
97 #[cfg(feature = "vecdeque")]
98 queue: VecDeque::new(),
99 }
100 }
101
102 pub fn with_capacity(ttl: Duration, capacity: usize) -> Self {
104 Self {
105 ttl,
106 #[cfg(feature = "doublestack")]
107 stack_1: Vec::with_capacity(capacity),
108 #[cfg(feature = "doublestack")]
109 stack_2: Vec::with_capacity(capacity),
110 #[cfg(feature = "vecdeque")]
111 queue: VecDeque::with_capacity(capacity),
112 }
113 }
114
115 pub fn push_back(&mut self, element: T) {
117 self.push_back_entry(Instant::now(), element)
118 }
119
120 fn push_back_entry(&mut self, instant: Instant, element: T) {
122 let entry = (instant, element);
123 #[cfg(feature = "doublestack")]
124 {
125 self.stack_1.push(entry);
126 }
127 #[cfg(feature = "vecdeque")]
128 {
129 self.queue.push_back(entry)
130 }
131 }
132
133 pub fn refresh_and_push_back(&mut self, element: T) -> usize {
136 let count = self.refresh();
137 self.push_back(element);
138 count + 1
139 }
140
141 pub fn pop_front(&mut self) -> Option<(Instant, T)> {
144 #[cfg(feature = "doublestack")]
145 {
146 self.ensure_stack_full(false);
147 self.stack_2.pop()
148 }
149 #[cfg(feature = "vecdeque")]
150 {
151 self.queue.pop_front()
152 }
153 }
154
155 pub fn peek_front(&mut self) -> Option<&(Instant, T)> {
157 #[cfg(feature = "doublestack")]
158 {
159 self.ensure_stack_full(false);
160 self.stack_2.first()
161 }
162 #[cfg(feature = "vecdeque")]
163 {
164 self.queue.front()
165 }
166 }
167
168 #[cfg(feature = "doublestack")]
169 fn ensure_stack_full(&mut self, force: bool) {
170 if self.stack_2.is_empty() || force {
171 while let Some(item) = self.stack_1.pop() {
172 self.stack_2.push(item);
173 }
174 }
175 }
176
177 pub fn len(&self) -> usize {
182 #[cfg(feature = "doublestack")]
183 {
184 self.stack_1.len() + self.stack_2.len()
185 }
186 #[cfg(feature = "vecdeque")]
187 {
188 self.queue.len()
189 }
190 }
191
192 pub fn is_empty(&self) -> bool {
198 #[cfg(feature = "doublestack")]
199 {
200 self.stack_1.is_empty() && self.stack_2.is_empty()
201 }
202 #[cfg(feature = "vecdeque")]
203 {
204 self.queue.is_empty()
205 }
206 }
207
208 #[cfg(feature = "doublestack")]
210 pub fn refresh(&mut self) -> usize {
211 let now = Instant::now();
212
213 while let Some((instant, _element)) = self.stack_2.first() {
214 if (now - *instant) < self.ttl {
215 break;
216 }
217
218 let _result = self.stack_2.pop();
219 debug_assert!(_result.is_some());
220 }
221
222 if !self.stack_2.is_empty() {
223 return self.len();
224 }
225
226 while let Some((instant, _element)) = self.stack_1.first() {
227 if (now - *instant) < self.ttl {
228 break;
229 }
230
231 let _result = self.stack_1.pop();
232 debug_assert!(_result.is_some());
233 }
234
235 debug_assert_eq!(self.stack_1.len(), self.len());
236 self.stack_1.len()
237 }
238
239 #[cfg(feature = "vecdeque")]
241 pub fn refresh(&mut self) -> usize {
242 let now = Instant::now();
243
244 while let Some((instant, _element)) = self.queue.front() {
245 if (now - *instant) < self.ttl {
246 break;
247 }
248
249 let _result = self.queue.pop_front();
250 debug_assert!(_result.is_some());
251 }
252
253 self.queue.len()
254 }
255
256 pub fn iter(&self) -> impl Iterator<Item = &(Instant, T)> {
258 #[cfg(feature = "doublestack")]
259 {
260 return DoubleStackIterator::new(&self);
261 }
262 #[cfg(feature = "vecdeque")]
263 {
264 self.queue.iter()
265 }
266 }
267
268 pub fn avg_delta(&self) -> Duration {
270 if self.len() <= 1 {
271 return Duration::ZERO;
272 }
273
274 let (count, sum) = self
275 .iter()
276 .zip(self.iter().skip(1))
277 .fold((0, Duration::ZERO), |(count, sum), (lhs, rhs)| {
278 (count + 1, sum + (rhs.0 - lhs.0))
279 });
280
281 debug_assert_ne!(count, 0);
282 sum / count
283 }
284}
285
286impl<T> IntoIterator for TtlQueue<T> {
287 type Item = (Instant, T);
288
289 #[cfg(feature = "vecdeque")]
290 type IntoIter = std::collections::vec_deque::IntoIter<Self::Item>;
291
292 #[cfg(feature = "doublestack")]
293 type IntoIter = std::iter::Chain<
294 std::iter::Rev<std::vec::IntoIter<Self::Item>>,
295 std::vec::IntoIter<Self::Item>,
296 >;
297
298 fn into_iter(self) -> Self::IntoIter {
299 #[cfg(feature = "vecdeque")]
300 {
301 self.queue.into_iter()
302 }
303 #[cfg(feature = "doublestack")]
304 {
305 self.stack_2
306 .into_iter()
307 .rev()
308 .chain(self.stack_1.into_iter())
309 }
310 }
311}
312
313#[cfg(feature = "doublestack")]
314pub struct DoubleStackIterator<'a, T> {
315 queue: &'a TtlQueue<T>,
316 stage: DoubleStackIteratorStage<'a, T>,
317}
318
319#[cfg(feature = "doublestack")]
320enum DoubleStackIteratorStage<'a, T> {
321 First(std::iter::Rev<std::slice::Iter<'a, (Instant, T)>>),
322 Second(std::slice::Iter<'a, (Instant, T)>),
323 Done,
324}
325
326#[cfg(feature = "doublestack")]
327impl<'a, T> Iterator for DoubleStackIteratorStage<'a, T> {
328 type Item = &'a (Instant, T);
329
330 fn next(&mut self) -> Option<Self::Item> {
331 match self {
332 DoubleStackIteratorStage::First(iter) => iter.next(),
333 DoubleStackIteratorStage::Second(iter) => iter.next(),
334 DoubleStackIteratorStage::Done => None,
335 }
336 }
337}
338
339#[cfg(feature = "doublestack")]
340impl<'a, T> DoubleStackIterator<'a, T> {
341 pub fn new(queue: &'a TtlQueue<T>) -> Self {
342 Self {
343 queue,
344 stage: DoubleStackIteratorStage::First(queue.stack_2.iter().rev()),
345 }
346 }
347}
348
349#[cfg(feature = "doublestack")]
350impl<'a, T> Iterator for DoubleStackIterator<'a, T> {
351 type Item = &'a (Instant, T);
352
353 fn next(&mut self) -> Option<Self::Item> {
354 loop {
355 if let Some(element) = self.stage.next() {
356 return Some(element);
357 }
358
359 if matches!(self.stage, DoubleStackIteratorStage::First(..)) {
360 self.stage = DoubleStackIteratorStage::Second(self.queue.stack_1.iter());
361 continue;
362 }
363
364 debug_assert!(matches!(self.stage, DoubleStackIteratorStage::Second(..)));
365
366 self.stage = DoubleStackIteratorStage::Done;
367 return None;
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use std::thread;
376
377 #[test]
378 fn it_works() {
379 let mut queue = TtlQueue::new(Duration::from_millis(50));
380 queue.push_back(10);
381 queue.push_back(20);
382 queue.push_back(30);
383 assert_eq!(queue.refresh(), 3);
384
385 let value = queue.pop_front().unwrap();
386 assert_eq!(value.1, 10);
387
388 assert_eq!(queue.refresh(), 2);
389
390 thread::sleep(Duration::from_millis(50));
391 assert_eq!(queue.refresh(), 0);
392 }
393
394 #[test]
395 fn iter_works() {
396 let mut queue = TtlQueue::new(Duration::MAX);
397 for i in 0..1000 {
398 queue.push_back((i * 10) as usize);
399
400 #[cfg(feature = "doublestack")]
402 {
403 if i == 500 {
404 queue.ensure_stack_full(true);
405 }
406 }
407 }
408
409 for (i, (_instant, value)) in queue.iter().enumerate() {
410 assert_eq!(*value, i * 10);
411 }
412 }
413
414 #[test]
415 fn into_iter_works() {
416 let mut queue = TtlQueue::new(Duration::MAX);
417 for i in 0..100 {
418 queue.push_back((i * 10) as usize);
419
420 #[cfg(feature = "doublestack")]
422 {
423 if i == 50 {
424 queue.ensure_stack_full(true);
425 }
426 }
427 }
428
429 for (i, (_instant, value)) in queue.into_iter().enumerate() {
430 assert_eq!(value, i * 10);
431 }
432 }
433
434 #[test]
435 fn avg_duration_works() {
436 let mut queue = TtlQueue::new(Duration::MAX);
437 let now = Instant::now();
438
439 for i in 0..10 {
440 queue.push_back_entry(now + Duration::from_secs(i), ());
441 }
442
443 let avg = queue.avg_delta();
444 assert_eq!(avg, Duration::from_secs(1));
445 }
446
447 #[test]
448 fn avg_duration_with_zero_inputs_works() {
449 let queue = TtlQueue::<()>::new(Duration::MAX);
450
451 let avg = queue.avg_delta();
452 assert_eq!(avg, Duration::ZERO);
453 }
454
455 #[test]
456 fn avg_duration_with_one_inputs_works() {
457 let mut queue = TtlQueue::new(Duration::MAX);
458 queue.push_back(());
459
460 let avg = queue.avg_delta();
461 assert_eq!(avg, Duration::ZERO);
462 }
463
464 #[test]
465 fn fps_counter() {
466 let mut fps_counter = TtlQueue::new(Duration::from_secs(1));
467
468 for _i in 0..50 {
469 let fps = fps_counter.refresh_and_push_back(());
472 debug_assert!(fps >= 1);
473
474 thread::sleep(Duration::from_millis(19));
476 }
477
478 let fps = fps_counter.refresh();
479 debug_assert!(fps >= 45 && fps <= 55);
480
481 let delta = fps_counter.avg_delta();
482 debug_assert!(delta >= Duration::from_millis(19) && delta <= Duration::from_millis(21));
483 }
484}