#![allow(dead_code)]
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
#[derive(Debug, Clone, Copy, Default)]
pub struct RingBufferStats {
pub capacity: usize,
pub len: usize,
pub push_count: u64,
pub pop_count: u64,
pub overflow_count: u64,
}
pub struct RingBuffer<T> {
data: Vec<Option<T>>,
head: usize,
tail: usize,
capacity: usize,
len: usize,
push_count: u64,
pop_count: u64,
overflow_count: u64,
}
impl<T> RingBuffer<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "RingBuffer capacity must be non-zero");
let mut data = Vec::with_capacity(capacity);
for _ in 0..capacity {
data.push(None);
}
Self {
data,
head: 0,
tail: 0,
capacity,
len: 0,
push_count: 0,
pop_count: 0,
overflow_count: 0,
}
}
#[must_use]
pub fn len(&self) -> usize {
self.len
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[must_use]
pub fn is_full(&self) -> bool {
self.len == self.capacity
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn push(&mut self, item: T) -> bool {
if self.is_full() {
self.overflow_count += 1;
return false;
}
self.data[self.head] = Some(item);
self.head = (self.head + 1) % self.capacity;
self.len += 1;
self.push_count += 1;
true
}
pub fn push_overwrite(&mut self, item: T) {
if self.is_full() {
self.tail = (self.tail + 1) % self.capacity;
self.len -= 1;
self.overflow_count += 1;
}
self.data[self.head] = Some(item);
self.head = (self.head + 1) % self.capacity;
self.len += 1;
self.push_count += 1;
}
pub fn pop(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
let item = self.data[self.tail].take();
self.tail = (self.tail + 1) % self.capacity;
self.len -= 1;
self.pop_count += 1;
item
}
#[must_use]
pub fn peek(&self) -> Option<&T> {
if self.is_empty() {
return None;
}
self.data[self.tail].as_ref()
}
pub fn clear(&mut self) {
for slot in &mut self.data {
*slot = None;
}
self.head = 0;
self.tail = 0;
self.len = 0;
}
#[must_use]
pub fn stats(&self) -> RingBufferStats {
RingBufferStats {
capacity: self.capacity,
len: self.len,
push_count: self.push_count,
pop_count: self.pop_count,
overflow_count: self.overflow_count,
}
}
pub fn iter(&self) -> RingBufferIter<'_, T> {
RingBufferIter {
buffer: self,
index: 0,
}
}
}
pub struct RingBufferIter<'a, T> {
buffer: &'a RingBuffer<T>,
index: usize,
}
impl<'a, T> Iterator for RingBufferIter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.buffer.len {
return None;
}
let slot = (self.buffer.tail + self.index) % self.buffer.capacity;
self.index += 1;
self.buffer.data[slot].as_ref()
}
}
pub struct MediaFrameQueue {
video: RingBuffer<Vec<u8>>,
audio: RingBuffer<Vec<f32>>,
pub video_pts: VecDeque<i64>,
pub audio_pts: VecDeque<i64>,
}
impl MediaFrameQueue {
#[must_use]
pub fn new(video_cap: usize, audio_cap: usize) -> Self {
Self {
video: RingBuffer::new(video_cap),
audio: RingBuffer::new(audio_cap),
video_pts: VecDeque::with_capacity(video_cap),
audio_pts: VecDeque::with_capacity(audio_cap),
}
}
pub fn push_video(&mut self, frame: Vec<u8>, pts: i64) -> bool {
if self.video.push(frame) {
self.video_pts.push_back(pts);
true
} else {
false
}
}
pub fn push_audio(&mut self, samples: Vec<f32>, pts: i64) -> bool {
if self.audio.push(samples) {
self.audio_pts.push_back(pts);
true
} else {
false
}
}
pub fn pop_video(&mut self) -> Option<(Vec<u8>, i64)> {
let frame = self.video.pop()?;
let pts = self.video_pts.pop_front().unwrap_or(0);
Some((frame, pts))
}
pub fn pop_audio(&mut self) -> Option<(Vec<f32>, i64)> {
let samples = self.audio.pop()?;
let pts = self.audio_pts.pop_front().unwrap_or(0);
Some((samples, pts))
}
#[must_use]
pub fn sync_offset_ms(&self) -> Option<i64> {
let v = *self.video_pts.front()?;
let a = *self.audio_pts.front()?;
Some(v - a)
}
#[must_use]
pub fn video_len(&self) -> usize {
self.video.len()
}
#[must_use]
pub fn audio_len(&self) -> usize {
self.audio.len()
}
}
pub struct SpscRingBuffer<T> {
slots: Box<[Mutex<Option<T>>]>,
head: AtomicUsize,
tail: AtomicUsize,
capacity: usize,
mask: usize,
}
impl<T> SpscRingBuffer<T> {
#[must_use]
pub fn new(min_capacity: usize) -> Self {
let capacity = min_capacity.max(2).next_power_of_two();
let mask = capacity - 1;
let slots: Box<[Mutex<Option<T>>]> = (0..capacity)
.map(|_| Mutex::new(None))
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
slots,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
capacity,
mask,
}
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
}
#[must_use]
pub fn is_full(&self) -> bool {
self.len() == self.capacity
}
pub fn push(&self, item: T) -> bool {
let head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) == self.capacity {
return false; }
let slot = &self.slots[head & self.mask];
if let Ok(mut guard) = slot.lock() {
*guard = Some(item);
}
self.head.store(head.wrapping_add(1), Ordering::Release);
true
}
pub fn pop(&self) -> Option<T> {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Acquire);
if head == tail {
return None; }
let slot = &self.slots[tail & self.mask];
let item = slot.lock().ok().and_then(|mut g| g.take());
self.tail.store(tail.wrapping_add(1), Ordering::Release);
item
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for SpscRingBuffer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpscRingBuffer")
.field("capacity", &self.capacity)
.field("len", &self.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_empty() {
let rb: RingBuffer<i32> = RingBuffer::new(4);
assert!(rb.is_empty());
assert!(!rb.is_full());
assert_eq!(rb.len(), 0);
assert_eq!(rb.capacity(), 4);
}
#[test]
fn test_push_pop_ordering() {
let mut rb = RingBuffer::new(4);
assert!(rb.push(1));
assert!(rb.push(2));
assert!(rb.push(3));
assert_eq!(rb.pop(), Some(1));
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), Some(3));
assert_eq!(rb.pop(), None);
}
#[test]
fn test_push_when_full_returns_false() {
let mut rb = RingBuffer::new(2);
assert!(rb.push(10));
assert!(rb.push(20));
assert!(rb.is_full());
assert!(!rb.push(30));
assert_eq!(rb.stats().overflow_count, 1);
}
#[test]
fn test_push_overwrite_evicts_oldest() {
let mut rb = RingBuffer::new(3);
rb.push_overwrite(1);
rb.push_overwrite(2);
rb.push_overwrite(3);
rb.push_overwrite(4);
assert_eq!(rb.len(), 3);
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), Some(3));
assert_eq!(rb.pop(), Some(4));
}
#[test]
fn test_peek_does_not_consume() {
let mut rb = RingBuffer::new(4);
rb.push(42);
assert_eq!(rb.peek(), Some(&42));
assert_eq!(rb.len(), 1);
assert_eq!(rb.pop(), Some(42));
}
#[test]
fn test_clear() {
let mut rb = RingBuffer::new(4);
rb.push(1);
rb.push(2);
rb.clear();
assert!(rb.is_empty());
assert_eq!(rb.pop(), None);
}
#[test]
fn test_iter_order() {
let mut rb = RingBuffer::new(5);
for i in 0..4_i32 {
rb.push(i);
}
let collected: Vec<i32> = rb.iter().copied().collect();
assert_eq!(collected, vec![0, 1, 2, 3]);
}
#[test]
fn test_wrap_around() {
let mut rb = RingBuffer::new(3);
rb.push(1);
rb.push(2);
rb.pop(); rb.push(3);
rb.push(4);
assert_eq!(rb.len(), 3);
assert_eq!(rb.pop(), Some(2));
assert_eq!(rb.pop(), Some(3));
assert_eq!(rb.pop(), Some(4));
}
#[test]
fn test_stats_tracking() {
let mut rb = RingBuffer::new(2);
rb.push(1);
rb.push(2);
rb.push(3); rb.pop();
let s = rb.stats();
assert_eq!(s.push_count, 2);
assert_eq!(s.pop_count, 1);
assert_eq!(s.overflow_count, 1);
assert_eq!(s.len, 1);
}
#[test]
fn test_media_frame_queue_push_pop_video() {
let mut q = MediaFrameQueue::new(4, 4);
assert!(q.push_video(vec![1, 2, 3], 1000));
let (frame, pts) = q.pop_video().expect("should have video");
assert_eq!(frame, vec![1, 2, 3]);
assert_eq!(pts, 1000);
}
#[test]
fn test_media_frame_queue_push_pop_audio() {
let mut q = MediaFrameQueue::new(4, 4);
assert!(q.push_audio(vec![0.5_f32, -0.5], 2000));
let (samples, pts) = q.pop_audio().expect("should have audio");
assert_eq!(pts, 2000);
assert!((samples[0] - 0.5).abs() < 1e-6);
}
#[test]
fn test_sync_offset_ms() {
let mut q = MediaFrameQueue::new(4, 4);
q.push_video(vec![], 1050);
q.push_audio(vec![], 1000);
assert_eq!(q.sync_offset_ms(), Some(50));
}
#[test]
fn test_sync_offset_ms_empty() {
let q = MediaFrameQueue::new(4, 4);
assert!(q.sync_offset_ms().is_none());
}
#[test]
fn test_spsc_new_capacity_power_of_two() {
let buf: SpscRingBuffer<u32> = SpscRingBuffer::new(5);
assert_eq!(buf.capacity(), 8); let buf2: SpscRingBuffer<u32> = SpscRingBuffer::new(8);
assert_eq!(buf2.capacity(), 8);
let buf3: SpscRingBuffer<u32> = SpscRingBuffer::new(1);
assert_eq!(buf3.capacity(), 2); }
#[test]
fn test_spsc_push_pop_fifo() {
let buf: SpscRingBuffer<i32> = SpscRingBuffer::new(4);
assert!(buf.push(10));
assert!(buf.push(20));
assert!(buf.push(30));
assert_eq!(buf.pop(), Some(10));
assert_eq!(buf.pop(), Some(20));
assert_eq!(buf.pop(), Some(30));
assert_eq!(buf.pop(), None);
}
#[test]
fn test_spsc_is_empty_is_full() {
let buf: SpscRingBuffer<u8> = SpscRingBuffer::new(2);
assert!(buf.is_empty());
assert!(!buf.is_full());
buf.push(1);
buf.push(2);
assert!(buf.is_full());
assert!(!buf.is_empty());
}
#[test]
fn test_spsc_push_when_full_returns_false() {
let buf: SpscRingBuffer<u8> = SpscRingBuffer::new(2);
assert!(buf.push(1));
assert!(buf.push(2));
assert!(!buf.push(3)); assert_eq!(buf.len(), 2);
}
#[test]
fn test_spsc_wrap_around() {
let buf: SpscRingBuffer<u32> = SpscRingBuffer::new(4);
for i in 0..4 {
assert!(buf.push(i));
}
assert_eq!(buf.pop(), Some(0));
assert_eq!(buf.pop(), Some(1));
assert!(buf.push(4));
assert!(buf.push(5));
assert_eq!(buf.pop(), Some(2));
assert_eq!(buf.pop(), Some(3));
assert_eq!(buf.pop(), Some(4));
assert_eq!(buf.pop(), Some(5));
assert_eq!(buf.pop(), None);
}
#[test]
fn test_spsc_len() {
let buf: SpscRingBuffer<u32> = SpscRingBuffer::new(8);
assert_eq!(buf.len(), 0);
buf.push(1);
buf.push(2);
assert_eq!(buf.len(), 2);
buf.pop();
assert_eq!(buf.len(), 1);
}
#[test]
fn test_spsc_threaded_producer_consumer() {
use std::sync::Arc;
const N: u32 = 1000;
let buf: Arc<SpscRingBuffer<u32>> = Arc::new(SpscRingBuffer::new(64));
let producer = Arc::clone(&buf);
let handle = std::thread::spawn(move || {
for i in 0..N {
while !producer.push(i) {
std::thread::yield_now();
}
}
});
let mut received = Vec::with_capacity(N as usize);
while received.len() < N as usize {
if let Some(v) = buf.pop() {
received.push(v);
} else {
std::thread::yield_now();
}
}
handle.join().expect("producer thread should not panic");
let expected: Vec<u32> = (0..N).collect();
assert_eq!(received, expected);
}
}