#![allow(dead_code)]
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, Weak};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FrameKind {
Video {
width: u32,
height: u32,
bytes_per_pixel: u32,
},
Audio {
channel_count: u32,
samples_per_frame: u32,
bytes_per_sample: u32,
},
Raw {
size: usize,
},
}
impl FrameKind {
#[must_use]
pub fn byte_size(self) -> usize {
match self {
Self::Video {
width,
height,
bytes_per_pixel,
} => (width as usize)
.saturating_mul(height as usize)
.saturating_mul(bytes_per_pixel as usize),
Self::Audio {
channel_count,
samples_per_frame,
bytes_per_sample,
} => (channel_count as usize)
.saturating_mul(samples_per_frame as usize)
.saturating_mul(bytes_per_sample as usize),
Self::Raw { size } => size,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct FramePoolConfig {
pub pre_alloc: usize,
pub max_pool_size: usize,
pub frame_kind: FrameKind,
pub allow_overflow: bool,
}
pub struct PooledFrame {
data: Vec<u8>,
seq: u64,
}
impl PooledFrame {
fn new(size: usize) -> Self {
Self {
data: vec![0u8; size],
seq: 0,
}
}
#[must_use]
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[must_use]
pub fn seq(&self) -> u64 {
self.seq
}
pub fn clear(&mut self) {
self.data.fill(0);
}
}
pub struct AcquiredFrame {
frame: Arc<Mutex<PooledFrame>>,
pool: Option<Weak<PoolInner>>,
}
impl AcquiredFrame {
pub fn with<R, F: FnOnce(&PooledFrame) -> R>(&self, f: F) -> R {
let guard = self.frame.lock().expect("frame mutex poisoned");
f(&guard)
}
pub fn with_mut<R, F: FnOnce(&mut PooledFrame) -> R>(&self, f: F) -> R {
let mut guard = self.frame.lock().expect("frame mutex poisoned");
f(&mut guard)
}
pub fn seq(&self) -> u64 {
self.frame.lock().expect("frame mutex poisoned").seq
}
}
impl Drop for AcquiredFrame {
fn drop(&mut self) {
if let Some(weak) = self.pool.take() {
if let Some(inner) = weak.upgrade() {
let mut state = inner.state.lock().expect("pool mutex poisoned");
if state.idle.len() < state.max_pool_size {
inner.release_count.fetch_add(1, Ordering::Relaxed);
state.idle.push(Arc::clone(&self.frame));
}
}
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PoolStats {
pub pre_alloc: usize,
pub idle_count: usize,
pub acquire_count: u64,
pub release_count: u64,
pub overflow_count: u64,
}
struct PoolState {
idle: Vec<Arc<Mutex<PooledFrame>>>,
max_pool_size: usize,
}
struct PoolInner {
state: Mutex<PoolState>,
frame_byte_size: usize,
allow_overflow: bool,
pre_alloc: usize,
acquire_count: AtomicU64,
release_count: AtomicU64,
overflow_count: AtomicU64,
}
pub struct FramePool {
inner: Arc<PoolInner>,
seq: AtomicU64,
}
impl FramePool {
#[must_use]
pub fn new(config: FramePoolConfig) -> Self {
assert!(
config.max_pool_size > 0,
"FramePool max_pool_size must be non-zero"
);
let byte_size = config.frame_kind.byte_size();
let pre_alloc = config.pre_alloc.min(config.max_pool_size);
let mut idle = Vec::with_capacity(pre_alloc);
for _ in 0..pre_alloc {
idle.push(Arc::new(Mutex::new(PooledFrame::new(byte_size))));
}
let state = PoolState {
idle,
max_pool_size: config.max_pool_size,
};
Self {
inner: Arc::new(PoolInner {
state: Mutex::new(state),
frame_byte_size: byte_size,
allow_overflow: config.allow_overflow,
pre_alloc,
acquire_count: AtomicU64::new(0),
release_count: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
}),
seq: AtomicU64::new(0),
}
}
pub fn acquire(&self) -> Option<AcquiredFrame> {
let seq = self.seq.fetch_add(1, Ordering::Relaxed);
let mut state = self.inner.state.lock().expect("pool mutex poisoned");
if let Some(frame_arc) = state.idle.pop() {
drop(state);
{
let mut frame = frame_arc.lock().expect("frame mutex poisoned");
frame.seq = seq;
}
self.inner.acquire_count.fetch_add(1, Ordering::Relaxed);
Some(AcquiredFrame {
frame: frame_arc,
pool: Some(Arc::downgrade(&self.inner)),
})
} else if self.inner.allow_overflow {
drop(state);
self.inner.overflow_count.fetch_add(1, Ordering::Relaxed);
let mut frame = PooledFrame::new(self.inner.frame_byte_size);
frame.seq = seq;
Some(AcquiredFrame {
frame: Arc::new(Mutex::new(frame)),
pool: None, })
} else {
None
}
}
#[must_use]
pub fn stats(&self) -> PoolStats {
let state = self.inner.state.lock().expect("pool mutex poisoned");
PoolStats {
pre_alloc: self.inner.pre_alloc,
idle_count: state.idle.len(),
acquire_count: self.inner.acquire_count.load(Ordering::Relaxed),
release_count: self.inner.release_count.load(Ordering::Relaxed),
overflow_count: self.inner.overflow_count.load(Ordering::Relaxed),
}
}
pub fn idle_count(&self) -> usize {
self.inner
.state
.lock()
.expect("pool mutex poisoned")
.idle
.len()
}
pub fn max_pool_size(&self) -> usize {
self.inner
.state
.lock()
.expect("pool mutex poisoned")
.max_pool_size
}
pub fn frame_byte_size(&self) -> usize {
self.inner.frame_byte_size
}
pub fn warm(&self, target: usize) -> usize {
let mut state = self.inner.state.lock().expect("pool mutex poisoned");
let current = state.idle.len();
let to_add = target
.saturating_sub(current)
.min(state.max_pool_size.saturating_sub(current));
for _ in 0..to_add {
state.idle.push(Arc::new(Mutex::new(PooledFrame::new(
self.inner.frame_byte_size,
))));
}
to_add
}
pub fn shrink_to(&self, target: usize) -> usize {
let mut state = self.inner.state.lock().expect("pool mutex poisoned");
let current = state.idle.len();
let to_remove = current.saturating_sub(target);
for _ in 0..to_remove {
state.idle.pop();
}
to_remove
}
}
#[cfg(test)]
mod tests {
use super::*;
fn video_config(pre_alloc: usize, max: usize) -> FramePoolConfig {
FramePoolConfig {
pre_alloc,
max_pool_size: max,
frame_kind: FrameKind::Video {
width: 4,
height: 4,
bytes_per_pixel: 3,
},
allow_overflow: false,
}
}
#[test]
fn test_pre_alloc() {
let pool = FramePool::new(video_config(4, 8));
assert_eq!(pool.idle_count(), 4);
assert_eq!(pool.stats().pre_alloc, 4);
}
#[test]
fn test_acquire_and_release() {
let pool = FramePool::new(video_config(2, 4));
{
let _f = pool.acquire().expect("frame available");
assert_eq!(pool.idle_count(), 1);
}
assert_eq!(pool.idle_count(), 2);
assert_eq!(pool.stats().acquire_count, 1);
assert_eq!(pool.stats().release_count, 1);
}
#[test]
fn test_exhausted_no_overflow() {
let pool = FramePool::new(video_config(1, 2));
let _f1 = pool.acquire().expect("first frame");
assert!(pool.acquire().is_none());
}
#[test]
fn test_overflow_frame() {
let config = FramePoolConfig {
allow_overflow: true,
..video_config(0, 2)
};
let pool = FramePool::new(config);
let frame = pool.acquire().expect("overflow frame");
assert_eq!(pool.stats().overflow_count, 1);
drop(frame);
assert_eq!(pool.idle_count(), 0);
assert_eq!(pool.stats().release_count, 0);
}
#[test]
fn test_frame_byte_size() {
let pool = FramePool::new(video_config(1, 1));
assert_eq!(pool.frame_byte_size(), 4 * 4 * 3); let frame = pool.acquire().expect("frame");
frame.with(|f| assert_eq!(f.len(), 48));
}
#[test]
fn test_frame_data_mut() {
let pool = FramePool::new(video_config(1, 1));
let frame = pool.acquire().expect("frame");
frame.with_mut(|f| {
f.data_mut()[0] = 0xAB;
assert_eq!(f.data()[0], 0xAB);
f.clear();
assert_eq!(f.data()[0], 0x00);
});
}
#[test]
fn test_warm() {
let pool = FramePool::new(video_config(0, 8));
assert_eq!(pool.idle_count(), 0);
let added = pool.warm(4);
assert_eq!(added, 4);
assert_eq!(pool.idle_count(), 4);
}
#[test]
fn test_warm_clamps_to_max() {
let pool = FramePool::new(video_config(2, 4));
let added = pool.warm(10);
assert_eq!(added, 2); assert_eq!(pool.idle_count(), 4);
}
#[test]
fn test_shrink_to() {
let pool = FramePool::new(video_config(6, 8));
let removed = pool.shrink_to(2);
assert_eq!(removed, 4);
assert_eq!(pool.idle_count(), 2);
}
#[test]
fn test_audio_frame_kind() {
let config = FramePoolConfig {
pre_alloc: 2,
max_pool_size: 4,
frame_kind: FrameKind::Audio {
channel_count: 2,
samples_per_frame: 1024,
bytes_per_sample: 4,
},
allow_overflow: false,
};
let pool = FramePool::new(config);
assert_eq!(pool.frame_byte_size(), 2 * 1024 * 4);
}
#[test]
fn test_raw_frame_kind() {
let config = FramePoolConfig {
pre_alloc: 1,
max_pool_size: 2,
frame_kind: FrameKind::Raw { size: 256 },
allow_overflow: false,
};
let pool = FramePool::new(config);
assert_eq!(pool.frame_byte_size(), 256);
}
#[test]
fn test_seq_monotonic() {
let pool = FramePool::new(video_config(4, 8));
let f1 = pool.acquire().expect("f1");
let f2 = pool.acquire().expect("f2");
let s1 = f1.seq();
let s2 = f2.seq();
assert!(s2 > s1);
}
#[test]
fn test_return_over_max_freed() {
let pool = FramePool::new(video_config(0, 1));
let config2 = FramePoolConfig {
allow_overflow: true,
..video_config(0, 1)
};
let pool2 = FramePool::new(config2);
let _f1 = pool2.acquire(); pool2.warm(1);
let f2 = pool2.acquire().expect("pool frame");
drop(f2); let f3 = pool2.acquire().expect("reclaimed frame");
let f4 = pool2.acquire(); drop(f3);
drop(f4);
assert!(pool2.stats().release_count >= 1);
drop(pool);
}
#[test]
fn test_threaded_acquire_release() {
use std::sync::Arc as StdArc;
use std::thread;
let pool = StdArc::new(FramePool::new(video_config(8, 8)));
let mut handles = Vec::new();
for _ in 0..4 {
let p = StdArc::clone(&pool);
handles.push(thread::spawn(move || {
for _ in 0..10 {
if let Some(frame) = p.acquire() {
frame.with_mut(|f| f.data_mut()[0] = 42);
}
}
}));
}
for h in handles {
h.join().expect("thread");
}
let stats = pool.stats();
assert!(stats.acquire_count > 0);
}
}