#![allow(missing_docs)]
use std::mem;
use std::time::{Duration, Instant};
use crate::util::already_happened;
#[derive(Debug)]
pub struct EvictingBuffer<T> {
buf: Vec<Option<Entry<T>>>,
max_age: Duration,
max_size: usize,
last_position: Option<u64>,
next_evict: Option<u64>,
last_timeout: Instant,
}
#[derive(Debug)]
struct Entry<T> {
position: u64,
timestamp: Instant,
value: T,
}
fn prepare_buf<T>(len: usize) -> Vec<Option<T>> {
let mut buf = Vec::with_capacity(len);
for _ in 0..len {
buf.push(None);
}
buf
}
impl<T> EvictingBuffer<T> {
pub fn new(initial_size: usize, max_age: Duration, max_size: usize) -> Self {
Self {
buf: prepare_buf(initial_size),
max_age,
max_size,
last_position: None,
next_evict: None,
last_timeout: already_happened(),
}
}
fn index_for_position(&self, position: u64) -> usize {
(position % self.buf.len() as u64) as usize
}
#[inline(always)]
fn is_inert(&self) -> bool {
self.buf.is_empty() || self.max_age.is_zero()
}
pub fn push(&mut self, position: u64, timestamp: Instant, value: T) {
if self.is_inert() {
return;
}
if timestamp < self.last_timeout {
return;
}
let next_evict = if let Some(v) = self.next_evict {
v
} else {
self.next_evict = Some(position);
position
};
if position < next_evict {
return;
}
let mut index = self.index_for_position(position);
if let Some(entry) = &self.buf[index] {
if entry.position != position {
self.grow();
index = self.index_for_position(position);
}
}
self.last_position = Some(position);
self.buf[index] = Some(Entry {
position,
timestamp,
value,
});
}
#[allow(unused)]
pub fn get(&self, position: u64) -> Option<&T> {
if self.is_inert() {
return None;
}
let index = (position % self.buf.len() as u64) as usize;
if let Some(entry) = &self.buf[index] {
if entry.position == position {
return Some(&entry.value);
}
}
None
}
pub fn get_mut(&mut self, position: u64) -> Option<&mut T> {
if self.is_inert() {
return None;
}
let index = (position % self.buf.len() as u64) as usize;
if let Some(entry) = &mut self.buf[index] {
if entry.position == position {
return Some(&mut entry.value);
}
}
None
}
pub fn maybe_evict(&mut self, now: Instant) {
if self.is_inert() {
return;
}
if now < self.last_timeout {
return;
}
self.last_timeout = now;
self.evict(now);
}
fn evict(&mut self, now: Instant) {
let Some(start_position) = self.next_evict else {
return;
};
let mut position = start_position;
let start_index = self.index_for_position(position);
loop {
let index = self.index_for_position(position);
if index == start_index && position > start_position {
break;
}
let Some(entry) = &self.buf[index] else {
position += 1;
continue;
};
let age = now.saturating_duration_since(entry.timestamp);
if age > self.max_age {
self.buf[index] = None;
} else {
break;
}
position += 1;
}
self.next_evict = Some(position);
}
fn grow(&mut self) {
if self.buf.len() >= self.max_size {
return;
}
let new_size = self
.max_size
.min((self.buf.len() * 133) / 100)
.max(self.buf.len() + 1);
let old_buffer = mem::replace(&mut self.buf, prepare_buf(new_size));
for e in old_buffer.into_iter().flatten() {
let index = self.index_for_position(e.position);
self.buf[index] = Some(e);
}
}
pub fn contains(&self, position: u64) -> bool {
if self.is_inert() {
return false;
}
let index = self.index_for_position(position);
self.buf[index].is_some()
}
pub fn last_position(&self) -> Option<u64> {
if self.is_inert() {
return None;
}
self.last_position
}
pub fn last(&self) -> Option<&T> {
if self.is_inert() {
return None;
}
let last = self.last_position?;
self.get(last)
}
pub fn clear(&mut self) {
for i in 0..self.buf.len() {
self.buf[i] = None;
}
self.last_position = None;
self.next_evict = None;
self.last_timeout = already_happened();
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn push_and_get() {
let mut buf = EvictingBuffer::new(1, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(5, now, 'A');
assert_eq!(buf.index_for_position(5), buf.index_for_position(3));
assert_eq!(buf.get(5), Some(&'A'));
assert_eq!(buf.get(3), None); }
#[test]
fn push_over_capacity() {
let mut buf = EvictingBuffer::new(2, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(5, now + Duration::from_secs(0), 'A');
buf.push(6, now + Duration::from_secs(1), 'B');
buf.push(7, now + Duration::from_secs(2), 'C');
assert_eq!(buf.get(5), Some(&'A'));
assert_eq!(buf.get(6), Some(&'B'));
assert_eq!(buf.get(7), Some(&'C'));
}
#[test]
fn push_before_next_evict() {
let mut buf = EvictingBuffer::new(2, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(6, now + Duration::from_secs(0), 'B');
assert_eq!(buf.next_evict, Some(6));
buf.push(5, now + Duration::from_secs(1), 'A');
assert_eq!(buf.get(5), None);
}
#[test]
fn evict_oldest() {
let mut buf = EvictingBuffer::new(2, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(5, now + Duration::from_secs(0), 'A');
buf.push(6, now + Duration::from_secs(1), 'B');
buf.maybe_evict(now + Duration::from_secs(1));
assert_eq!(buf.get(5), Some(&'A'));
assert_eq!(buf.get(6), Some(&'B'));
buf.maybe_evict(now + Duration::from_secs(11));
assert_eq!(buf.get(5), None);
assert_eq!(buf.get(6), Some(&'B'));
}
#[test]
fn evict_with_gap() {
let mut buf = EvictingBuffer::new(4, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(5, now + Duration::from_secs(0), 'A');
buf.push(7, now + Duration::from_secs(2), 'C');
buf.push(8, now + Duration::from_secs(3), 'D');
buf.maybe_evict(now + Duration::from_secs(13));
assert_eq!(buf.get(5), None);
assert_eq!(buf.get(7), None);
assert_eq!(buf.get(8), Some(&'D'));
}
#[test]
fn evict_all() {
let mut buf = EvictingBuffer::new(4, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(5, now + Duration::from_secs(0), 'A');
buf.push(6, now + Duration::from_secs(1), 'B');
buf.maybe_evict(now + Duration::from_secs(12));
assert_eq!(buf.get(5), None);
assert_eq!(buf.get(6), None);
}
fn buffer_cmp(b: &EvictingBuffer<char>) -> Vec<Option<char>> {
b.buf.iter().map(|e| e.as_ref().map(|v| v.value)).collect()
}
#[test]
fn grow_and_reindex() {
let mut buf = EvictingBuffer::new(2, Duration::from_secs(10), 10);
let now = Instant::now();
buf.push(2, now + Duration::from_secs(0), 'A');
buf.push(3, now + Duration::from_secs(1), 'B');
assert_eq!(buffer_cmp(&buf), &[Some('A'), Some('B')]);
buf.push(4, now + Duration::from_secs(2), 'C');
assert_eq!(buffer_cmp(&buf), &[Some('B'), Some('C'), Some('A')]);
assert_eq!(buf.get(2), Some(&'A'));
assert_eq!(buf.get(3), Some(&'B'));
assert_eq!(buf.get(4), Some(&'C'));
}
}