1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//! Bounded FIFO ring with drop-oldest-on-overflow semantics.
//!
//! Used by buffering audit appenders (meshos action chain, meshos
//! admin audit, meshos log chain) and the fold-runtime ring audit
//! sink — every one of those wants the same shape: a thread-safe
//! `VecDeque<T>` capped at `capacity`, that drops the oldest entry
//! and increments a counter when full, with snapshot via `Clone`.
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
/// Thread-safe bounded FIFO. On push past capacity, drops the
/// oldest entry and increments [`Self::dropped`].
///
/// `capacity == 0` is accepted and treated as "store nothing" —
/// every `push` increments the dropped counter and returns
/// without growing the deque.
#[derive(Debug)]
pub struct BoundedRing<T> {
capacity: usize,
items: parking_lot::Mutex<VecDeque<T>>,
dropped: AtomicU64,
}
impl<T> BoundedRing<T> {
/// Build an empty ring capped at `capacity` items.
pub fn new(capacity: usize) -> Self {
Self {
capacity,
items: parking_lot::Mutex::new(VecDeque::new()),
dropped: AtomicU64::new(0),
}
}
/// Capacity the ring was constructed with.
pub fn capacity(&self) -> usize {
self.capacity
}
/// Current stored count (always `<= capacity`).
pub fn len(&self) -> usize {
self.items.lock().len()
}
/// Whether the ring currently holds zero items.
pub fn is_empty(&self) -> bool {
self.items.lock().is_empty()
}
/// Total number of items dropped because the ring was at
/// capacity (or because `capacity == 0`). Strictly
/// non-decreasing.
pub fn dropped(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
/// Push an item. If the ring is full, drops the oldest and
/// bumps the dropped counter. Returns `true` if a drop
/// occurred (either the displaced oldest, or — when
/// `capacity == 0` — the just-pushed item itself).
pub fn push(&self, item: T) -> bool {
if self.capacity == 0 {
self.dropped.fetch_add(1, Ordering::Relaxed);
return true;
}
let mut items = self.items.lock();
if items.len() >= self.capacity {
items.pop_front();
items.push_back(item);
self.dropped.fetch_add(1, Ordering::Relaxed);
true
} else {
items.push_back(item);
false
}
}
}
impl<T: Clone> BoundedRing<T> {
/// Snapshot the stored items in insertion order (oldest →
/// newest). Returns a clone so the caller can render
/// without holding the ring's lock.
pub fn snapshot(&self) -> Vec<T> {
self.items.lock().iter().cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_under_capacity_does_not_drop() {
let r = BoundedRing::new(4);
assert!(!r.push(1));
assert!(!r.push(2));
assert_eq!(r.len(), 2);
assert_eq!(r.dropped(), 0);
}
#[test]
fn push_at_capacity_drops_oldest() {
let r = BoundedRing::new(2);
assert!(!r.push(1));
assert!(!r.push(2));
assert!(r.push(3));
assert!(r.push(4));
assert_eq!(r.snapshot(), vec![3, 4]);
assert_eq!(r.dropped(), 2);
}
#[test]
fn zero_capacity_stores_nothing() {
let r = BoundedRing::new(0);
assert!(r.push(1));
assert!(r.push(2));
assert!(r.is_empty());
assert_eq!(r.len(), 0);
assert!(r.snapshot().is_empty());
assert_eq!(r.dropped(), 2);
}
}