use crate::{TelemetryError, TelemetryResult};
use alloc::collections::VecDeque;
pub trait ByteCost {
fn byte_cost(&self) -> usize;
}
#[inline]
fn float_to_ratio(mult: f64) -> (usize, usize) {
let mult = mult.clamp(1.01, 16.0);
const DEN: usize = 1024;
let num = (mult * DEN as f64) as usize;
let num = num.max(DEN + 1);
(num, DEN)
}
#[derive(Debug, Clone)]
pub struct BoundedDeque<T> {
q: VecDeque<T>,
max_bytes: usize,
cur_bytes: usize,
max_elems: usize,
grow_num: usize,
grow_den: usize,
}
impl<T: ByteCost> BoundedDeque<T> {
fn prepare_push_fifo(&mut self, cost: usize) -> TelemetryResult<()> {
if cost > self.max_bytes {
return Err(TelemetryError::PacketTooLarge(
"Item exceeds maximum byte budget",
));
}
while !self.q.is_empty() && self.cur_bytes + cost > self.max_bytes {
let _ = self.pop_front();
}
if self.q.len() >= self.max_elems {
let _ = self.pop_front();
}
self.ensure_room_for_one();
Ok(())
}
pub fn new(max_bytes: usize, starting_bytes: usize, grow_mult: f64) -> Self {
if starting_bytes > max_bytes {
panic!(
"starting_bytes ({}) must be less than max_bytes ({}) to avoid conflicts",
starting_bytes, max_bytes
);
}
if max_bytes == 0 {
panic!("max_bytes must be greater than 0");
}
if grow_mult <= 1.0 {
panic!("grow_mult must be greater than 1.0");
}
let min_cost = size_of::<T>().max(1);
let max_elems = (max_bytes / min_cost).max(1);
let starting_elems = starting_bytes / min_cost;
let start_cap = starting_elems.clamp(1, max_elems);
let (grow_num, grow_den) = float_to_ratio(grow_mult);
Self {
q: VecDeque::with_capacity(start_cap),
max_bytes,
cur_bytes: 0,
max_elems,
grow_num,
grow_den,
}
}
#[inline]
pub fn len(&self) -> usize {
self.q.len()
}
#[allow(dead_code)]
#[inline]
pub fn is_empty(&self) -> bool {
self.q.is_empty()
}
#[allow(dead_code)]
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.q.iter()
}
#[inline]
pub fn contains(&self, v: &T) -> bool
where
T: PartialEq,
{
self.q.contains(v)
}
#[inline]
pub fn clear(&mut self) {
self.q.clear();
self.cur_bytes = 0;
}
#[allow(dead_code)]
#[inline]
pub fn bytes_used(&self) -> usize {
self.cur_bytes
}
#[allow(dead_code)]
#[inline]
pub fn max_bytes(&self) -> usize {
self.max_bytes
}
#[allow(dead_code)]
#[inline]
pub fn max_elems(&self) -> usize {
self.max_elems
}
#[allow(dead_code)]
#[inline]
pub fn capacity(&self) -> usize {
self.q.capacity()
}
pub fn pop_front(&mut self) -> Option<T> {
let v = self.q.pop_front()?;
self.cur_bytes = self.cur_bytes.saturating_sub(v.byte_cost());
Some(v)
}
#[allow(dead_code)]
pub fn pop_back(&mut self) -> Option<T> {
let v = self.q.pop_back()?;
self.cur_bytes = self.cur_bytes.saturating_sub(v.byte_cost());
Some(v)
}
pub fn remove_pos(&mut self, idx: usize) -> Option<T> {
let v = self.q.remove(idx)?;
self.cur_bytes = self.cur_bytes.saturating_sub(v.byte_cost());
Some(v)
}
pub fn remove_value(&mut self, needle: &T)
where
T: PartialEq,
{
if let Some(idx) = self.q.iter().position(|x| x == needle) {
let _ = self.remove_pos(idx);
}
}
pub fn retain<F>(&mut self, mut keep: F)
where
F: FnMut(&T) -> bool,
{
let mut idx = 0;
while idx < self.q.len() {
let keep_item = self.q.get(idx).is_some_and(&mut keep);
if keep_item {
idx += 1;
} else {
let _ = self.remove_pos(idx);
}
}
}
fn ensure_room_for_one(&mut self) {
let len = self.q.len();
let cap = self.q.capacity();
if len < cap {
return;
}
if len >= self.max_elems {
let _ = self.pop_front();
return;
}
if cap >= self.max_elems {
let _ = self.pop_front();
return;
}
let grow_num: usize = self.grow_num; let grow_den: usize = self.grow_den;
let scaled = cap.saturating_mul(grow_num).saturating_add(grow_den - 1);
let mut target_cap = scaled / grow_den;
target_cap = target_cap.max(cap + 1);
target_cap = target_cap.min(self.max_elems);
let add = target_cap.saturating_sub(cap);
if add > 0 {
self.q.reserve_exact(add);
} else {
let _ = self.pop_front();
}
debug_assert!(self.q.len() < self.q.capacity());
}
pub fn push_back(&mut self, v: T) -> TelemetryResult<()> {
let cost = v.byte_cost();
self.prepare_push_fifo(cost)?;
self.q.push_back(v);
self.cur_bytes = self.cur_bytes.saturating_add(cost);
Ok(())
}
pub fn push_back_prioritized<F>(&mut self, v: T, mut priority_of: F) -> TelemetryResult<()>
where
F: FnMut(&T) -> u8,
{
let cost = v.byte_cost();
if cost > self.max_bytes {
return Err(TelemetryError::PacketTooLarge(
"Item exceeds maximum byte budget",
));
}
let new_priority = priority_of(&v);
while !self.q.is_empty() && self.cur_bytes + cost > self.max_bytes {
let tail_priority = self.q.back().map(&mut priority_of).unwrap_or(0);
if tail_priority > new_priority {
return Err(TelemetryError::Io("priority queue saturated"));
}
let _ = self.pop_back();
}
if self.q.len() >= self.max_elems {
let tail_priority = self.q.back().map(&mut priority_of).unwrap_or(0);
if tail_priority > new_priority {
return Err(TelemetryError::Io("priority queue saturated"));
}
let _ = self.pop_back();
}
self.ensure_room_for_one();
let insert_at = self
.q
.iter()
.position(|existing| priority_of(existing) < new_priority);
if let Some(idx) = insert_at {
self.q.insert(idx, v);
} else {
self.q.push_back(v);
}
self.cur_bytes = self.cur_bytes.saturating_add(cost);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone, Debug, PartialEq, Eq)]
struct Item {
id: u8,
cost: usize,
priority: u8,
}
impl ByteCost for Item {
fn byte_cost(&self) -> usize {
self.cost
}
}
#[test]
fn prioritized_queue_preserves_fifo_within_same_priority() {
let mut q = BoundedDeque::new(64, 16, 2.0);
q.push_back_prioritized(
Item {
id: 1,
cost: 1,
priority: 10,
},
|item| item.priority,
)
.unwrap();
q.push_back_prioritized(
Item {
id: 2,
cost: 1,
priority: 10,
},
|item| item.priority,
)
.unwrap();
q.push_back_prioritized(
Item {
id: 3,
cost: 1,
priority: 20,
},
|item| item.priority,
)
.unwrap();
assert_eq!(q.pop_front().unwrap().id, 3);
assert_eq!(q.pop_front().unwrap().id, 1);
assert_eq!(q.pop_front().unwrap().id, 2);
}
#[test]
fn prioritized_queue_drops_lower_priority_when_saturated() {
let mut q = BoundedDeque::new(64, 32, 2.0);
q.push_back_prioritized(
Item {
id: 1,
cost: 32,
priority: 20,
},
|item| item.priority,
)
.unwrap();
q.push_back_prioritized(
Item {
id: 2,
cost: 32,
priority: 20,
},
|item| item.priority,
)
.unwrap();
let err = q
.push_back_prioritized(
Item {
id: 3,
cost: 32,
priority: 10,
},
|item| item.priority,
)
.unwrap_err();
assert!(matches!(
err,
TelemetryError::Io("priority queue saturated")
));
assert_eq!(q.pop_front().unwrap().id, 1);
assert_eq!(q.pop_front().unwrap().id, 2);
}
#[test]
fn bounded_queue_never_exceeds_element_cap_or_capacity() {
let mut q = BoundedDeque::new(256, 8, 2.0);
let max_elems = q.max_elems();
let max_bytes = q.max_bytes();
for id in 0..(max_elems * 4) {
q.push_back(Item {
id: id as u8,
cost: 1,
priority: 0,
})
.unwrap();
assert!(q.len() <= max_elems);
assert!(q.capacity() <= max_elems);
assert!(q.bytes_used() <= max_bytes);
}
assert_eq!(q.len(), max_elems);
}
#[test]
fn bounded_queue_never_exceeds_byte_budget_under_eviction() {
let mut q = BoundedDeque::new(48, 8, 2.0);
let max_elems = q.max_elems();
let max_bytes = q.max_bytes();
for id in 0..32u8 {
q.push_back(Item {
id,
cost: 17,
priority: 0,
})
.unwrap();
assert!(q.len() <= max_elems);
assert!(q.capacity() <= max_elems);
assert!(q.bytes_used() <= max_bytes);
}
let retained_bytes: usize = q.iter().map(ByteCost::byte_cost).sum();
assert_eq!(q.bytes_used(), retained_bytes);
}
}