use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub struct RingBuffer<T> {
buffer: Vec<Option<T>>,
read_pos: Arc<AtomicUsize>,
write_pos: Arc<AtomicUsize>,
capacity: usize,
}
impl<T> RingBuffer<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(None);
}
Self {
buffer,
read_pos: Arc::new(AtomicUsize::new(0)),
write_pos: Arc::new(AtomicUsize::new(0)),
capacity,
}
}
pub fn push(&mut self, item: T) -> Result<(), T> {
let write = self.write_pos.load(Ordering::Acquire);
let read = self.read_pos.load(Ordering::Acquire);
let next_write = (write + 1) % self.capacity;
if next_write == read {
return Err(item);
}
self.buffer[write] = Some(item);
self.write_pos.store(next_write, Ordering::Release);
Ok(())
}
pub fn pop(&mut self) -> Option<T> {
let read = self.read_pos.load(Ordering::Acquire);
let write = self.write_pos.load(Ordering::Acquire);
if read == write {
return None;
}
let item = self.buffer[read].take();
let next_read = (read + 1) % self.capacity;
self.read_pos.store(next_read, Ordering::Release);
item
}
#[must_use]
pub fn len(&self) -> usize {
let write = self.write_pos.load(Ordering::Acquire);
let read = self.read_pos.load(Ordering::Acquire);
if write >= read {
write - read
} else {
self.capacity - read + write
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn is_full(&self) -> bool {
let write = self.write_pos.load(Ordering::Acquire);
let read = self.read_pos.load(Ordering::Acquire);
let next_write = (write + 1) % self.capacity;
next_write == read
}
#[must_use]
pub const fn capacity(&self) -> usize {
self.capacity
}
pub fn clear(&mut self) {
while self.pop().is_some() {}
}
}
pub struct PacketBuffer {
buffer: RingBuffer<crate::packet::Packet>,
high_watermark: usize,
low_watermark: usize,
overflow_count: u64,
underflow_count: u64,
}
impl PacketBuffer {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
buffer: RingBuffer::new(capacity),
high_watermark: (capacity * 3) / 4,
low_watermark: capacity / 4,
overflow_count: 0,
underflow_count: 0,
}
}
pub fn add(&mut self, packet: crate::packet::Packet) -> bool {
if let Ok(()) = self.buffer.push(packet) {
if self.buffer.len() >= self.high_watermark {
tracing::warn!("Packet buffer high watermark reached");
}
true
} else {
self.overflow_count += 1;
tracing::error!("Packet buffer overflow");
false
}
}
pub fn get(&mut self) -> Option<crate::packet::Packet> {
match self.buffer.pop() {
Some(packet) => {
if self.buffer.len() <= self.low_watermark {
self.underflow_count += 1;
}
Some(packet)
}
None => None,
}
}
#[must_use]
pub fn len(&self) -> usize {
self.buffer.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
#[must_use]
pub const fn overflow_count(&self) -> u64 {
self.overflow_count
}
#[must_use]
pub const fn underflow_count(&self) -> u64 {
self.underflow_count
}
#[must_use]
pub fn occupancy(&self) -> f64 {
self.buffer.len() as f64 / self.buffer.capacity() as f64
}
pub fn clear(&mut self) {
self.buffer.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packet::PacketBuilder;
use bytes::Bytes;
#[test]
fn test_ring_buffer_new() {
let buffer: RingBuffer<u32> = RingBuffer::new(10);
assert_eq!(buffer.capacity(), 10);
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
}
#[test]
fn test_ring_buffer_push_pop() {
let mut buffer = RingBuffer::new(5);
assert!(buffer.push(1).is_ok());
assert!(buffer.push(2).is_ok());
assert!(buffer.push(3).is_ok());
assert_eq!(buffer.len(), 3);
assert_eq!(buffer.pop(), Some(1));
assert_eq!(buffer.pop(), Some(2));
assert_eq!(buffer.pop(), Some(3));
assert_eq!(buffer.pop(), None);
}
#[test]
fn test_ring_buffer_full() {
let mut buffer = RingBuffer::new(3);
assert!(buffer.push(1).is_ok());
assert!(buffer.push(2).is_ok());
assert!(buffer.is_full());
assert!(buffer.push(3).is_err());
}
#[test]
fn test_ring_buffer_wrap_around() {
let mut buffer = RingBuffer::new(3);
buffer.push(1).expect("should succeed in test");
buffer.push(2).expect("should succeed in test");
assert_eq!(buffer.pop(), Some(1));
buffer.push(3).expect("should succeed in test");
assert_eq!(buffer.pop(), Some(2));
assert_eq!(buffer.pop(), Some(3));
}
#[test]
fn test_ring_buffer_clear() {
let mut buffer = RingBuffer::new(5);
buffer.push(1).expect("should succeed in test");
buffer.push(2).expect("should succeed in test");
buffer.push(3).expect("should succeed in test");
buffer.clear();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
}
#[test]
fn test_packet_buffer() {
let mut buffer = PacketBuffer::new(10);
let packet = PacketBuilder::new(0)
.video()
.build(Bytes::from_static(b"test"))
.expect("should succeed in test");
assert!(buffer.add(packet));
assert_eq!(buffer.len(), 1);
assert!(!buffer.is_empty());
let retrieved = buffer.get();
assert!(retrieved.is_some());
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_packet_buffer_overflow() {
let mut buffer = PacketBuffer::new(3);
for i in 0..5 {
let packet = PacketBuilder::new(i)
.video()
.build(Bytes::from_static(b"test"))
.expect("should succeed in test");
buffer.add(packet);
}
assert!(buffer.overflow_count() > 0);
}
#[test]
fn test_packet_buffer_occupancy() {
let mut buffer = PacketBuffer::new(10);
for i in 0..5 {
let packet = PacketBuilder::new(i)
.video()
.build(Bytes::from_static(b"test"))
.expect("should succeed in test");
buffer.add(packet);
}
assert!((buffer.occupancy() - 0.5).abs() < 0.01);
}
#[test]
fn test_packet_buffer_clear() {
let mut buffer = PacketBuffer::new(10);
for i in 0..5 {
let packet = PacketBuilder::new(i)
.video()
.build(Bytes::from_static(b"test"))
.expect("should succeed in test");
buffer.add(packet);
}
buffer.clear();
assert_eq!(buffer.len(), 0);
}
}