1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
//! This provides a cache to check rate limits as well as store data for metrics.
//!
//! The cache essentially consists of a time-ordered list of elements. The list is split into two
//! sections, within the enforced time and without.
//!
//!
//! | | Enforced Time |
//! \[x,x,x,x,x,x,x,x,x,x,x,x,x,x,x,x\]
//!
//! The enforced time represents one seconds worth of elements. The target aims to limit the
//! number of elements that can be inserted within the enforced time. The length of the list is
//! determined by the `time_window` this can be longer than one second and can be used by metrics
//! to average results over large values than one second.
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
/// The time window that the size of the cache is enforced for. I.e if the size is 5 and
/// ENFORCED_SIZE_TIME is 1 second, this will allow 5 entries per second. This MUST be less than the
/// cache's `time_window`.
pub const ENFORCED_SIZE_TIME: u64 = 1;
pub struct ReceivedPacket<T> {
/// The source that sent us the packet.
pub content: T,
/// The time the packet was received.
pub received: Instant,
}
pub struct ReceivedPacketCache<T> {
/// The target number of entries per ENFORCED_SIZE_TIME before inserting new elements reports
/// failure. The maximum size of the cache is target*time_window
target: usize,
/// The cache stores `time_window` seconds worth of information to calculate a moving average.
/// This variable keeps track the number of elements in the cache within the
/// ENFORCED_SIZE_TIME.
time_window: u64,
/// This stores the current number of messages that are within the `ENFORCED_SIZE_TIME`.
within_enforced_time: usize,
/// The underlying data structure.
inner: VecDeque<ReceivedPacket<T>>,
}
impl<T> ReceivedPacketCache<T> {
/// Creates a new `ReceivedPacketCache` with a specified size from which no more can enter.
pub fn new(target: usize, time_window: u64) -> Self {
Self {
target,
time_window,
within_enforced_time: 0,
inner: VecDeque::with_capacity(target * time_window as usize),
}
}
/// Remove expired packets. We only keep, `CACHE_TIME` of data in the cache.
pub fn reset(&mut self) {
while let Some(packet) = self.inner.pop_front() {
if packet.received
> Instant::now()
.checked_sub(Duration::from_secs(self.time_window))
.unwrap()
{
// add the packet back and end
self.inner.push_front(packet);
break;
}
}
// update the within_enforced_time
let mut count = 0;
for packet in self.inner.iter().rev() {
if packet.received
> Instant::now()
.checked_sub(Duration::from_secs(ENFORCED_SIZE_TIME))
.unwrap()
{
count += 1;
} else {
break;
}
}
self.within_enforced_time = count;
}
/// Inserts an element into the cache, removing any expired elements.
pub fn cache_insert(&mut self, content: T) -> bool {
self.reset();
self.internal_insert(content)
}
/// Inserts an element into the cache without removing expired elements.
fn internal_insert(&mut self, content: T) -> bool {
if self.within_enforced_time >= self.target {
// Reached the target
false
} else {
let received_packet = ReceivedPacket {
content,
received: Instant::now(),
};
self.inner.push_back(received_packet);
self.within_enforced_time += 1;
true
}
}
}
impl<T> std::ops::Deref for ReceivedPacketCache<T> {
type Target = VecDeque<ReceivedPacket<T>>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> std::ops::DerefMut for ReceivedPacketCache<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}