use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::Instant;
use nv_frame::FrameEnvelope;
use crate::backpressure::BackpressurePolicy;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PushOutcome {
Accepted,
DroppedOldest,
Rejected,
}
#[derive(Debug)]
pub(crate) enum PopResult {
Frame(FrameEnvelope, std::time::Duration),
Closed,
Timeout,
Wake,
}
struct QueueInner {
buf: VecDeque<(Instant, FrameEnvelope)>,
closed: bool,
woken: bool,
total_received: u64,
total_dropped: u64,
}
pub(crate) struct FrameQueue {
inner: Mutex<QueueInner>,
not_empty: Condvar,
not_full: Condvar,
depth: usize,
policy: BackpressurePolicy,
}
impl FrameQueue {
pub fn new(policy: BackpressurePolicy) -> Self {
let depth = policy.queue_depth();
Self {
inner: Mutex::new(QueueInner {
buf: VecDeque::with_capacity(depth),
closed: false,
woken: false,
total_received: 0,
total_dropped: 0,
}),
not_empty: Condvar::new(),
not_full: Condvar::new(),
depth,
policy,
}
}
pub fn push(&self, frame: FrameEnvelope) -> PushOutcome {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if inner.closed {
tracing::debug!("FrameQueue::push — rejected: queue is closed");
return PushOutcome::Rejected;
}
inner.total_received += 1;
let now = Instant::now();
if inner.buf.len() < self.depth {
inner.buf.push_back((now, frame));
self.not_empty.notify_one();
return PushOutcome::Accepted;
}
match self.policy {
BackpressurePolicy::DropOldest { .. } => {
inner.buf.pop_front();
inner.total_dropped += 1;
inner.buf.push_back((now, frame));
self.not_empty.notify_one();
PushOutcome::DroppedOldest
}
BackpressurePolicy::DropNewest { .. } => {
inner.total_dropped += 1;
PushOutcome::Rejected
}
BackpressurePolicy::Block { .. } => {
while inner.buf.len() >= self.depth && !inner.closed {
inner = self.not_full.wait(inner).unwrap_or_else(|e| e.into_inner());
}
if inner.closed {
return PushOutcome::Rejected;
}
inner.buf.push_back((now, frame));
self.not_empty.notify_one();
PushOutcome::Accepted
}
}
}
pub fn pop(&self, shutdown: &AtomicBool, deadline: Option<Instant>) -> PopResult {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
loop {
if let Some((push_time, frame)) = inner.buf.pop_front() {
self.not_full.notify_one();
let hold_time = push_time.elapsed();
return PopResult::Frame(frame, hold_time);
}
if inner.closed || shutdown.load(Ordering::Relaxed) {
return PopResult::Closed;
}
if inner.woken {
inner.woken = false;
return PopResult::Wake;
}
match deadline {
Some(dl) => {
let now = Instant::now();
if now >= dl {
return PopResult::Timeout;
}
let (guard, result) = self
.not_empty
.wait_timeout(inner, dl - now)
.unwrap_or_else(|e| e.into_inner());
inner = guard;
if result.timed_out() && inner.buf.is_empty() {
if inner.closed || shutdown.load(Ordering::Relaxed) {
return PopResult::Closed;
}
if inner.woken {
inner.woken = false;
return PopResult::Wake;
}
return PopResult::Timeout;
}
}
None => {
inner = self
.not_empty
.wait(inner)
.unwrap_or_else(|e| e.into_inner());
}
}
}
}
pub fn wake_consumer(&self) {
{
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
inner.woken = true;
}
self.not_empty.notify_all();
}
pub fn close(&self) {
tracing::debug!("FrameQueue::close — closing queue");
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
inner.closed = true;
self.not_empty.notify_all();
self.not_full.notify_all();
}
pub(crate) fn depth(&self) -> usize {
self.inner
.lock()
.unwrap_or_else(|e| e.into_inner())
.buf
.len()
}
#[must_use]
pub(crate) fn capacity(&self) -> usize {
self.depth
}
}
#[cfg(test)]
impl FrameQueue {
pub fn stats(&self) -> (u64, u64) {
let inner = self.inner.lock().unwrap();
(inner.total_received, inner.total_dropped)
}
pub fn len(&self) -> usize {
self.inner.lock().unwrap().buf.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use nv_core::{FeedId, MonotonicTs, TypedMetadata, WallTs};
use nv_frame::PixelFormat;
use std::time::Duration;
fn test_frame(seq: u64) -> FrameEnvelope {
FrameEnvelope::new_owned(
FeedId::new(1),
seq,
MonotonicTs::from_nanos(seq * 33_333_333),
WallTs::from_micros(0),
2,
2,
PixelFormat::Rgb8,
6,
vec![0u8; 12],
TypedMetadata::new(),
)
}
#[test]
fn push_pop_basic() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 4 });
assert_eq!(q.push(test_frame(0)), PushOutcome::Accepted);
assert_eq!(q.push(test_frame(1)), PushOutcome::Accepted);
assert_eq!(q.len(), 2);
let shutdown = AtomicBool::new(false);
let PopResult::Frame(f, _hold) = q.pop(&shutdown, None) else {
panic!("expected frame");
};
assert_eq!(f.seq(), 0);
assert_eq!(q.len(), 1);
}
#[test]
fn drop_oldest_evicts_when_full() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 2 });
assert_eq!(q.push(test_frame(0)), PushOutcome::Accepted);
assert_eq!(q.push(test_frame(1)), PushOutcome::Accepted);
assert_eq!(q.push(test_frame(2)), PushOutcome::DroppedOldest);
assert_eq!(q.len(), 2);
let shutdown = AtomicBool::new(false);
let PopResult::Frame(f, _hold) = q.pop(&shutdown, None) else {
panic!("expected frame");
};
assert_eq!(f.seq(), 1);
let (received, dropped) = q.stats();
assert_eq!(received, 3);
assert_eq!(dropped, 1);
}
#[test]
fn drop_newest_rejects_when_full() {
let q = FrameQueue::new(BackpressurePolicy::DropNewest { queue_depth: 2 });
assert_eq!(q.push(test_frame(0)), PushOutcome::Accepted);
assert_eq!(q.push(test_frame(1)), PushOutcome::Accepted);
assert_eq!(q.push(test_frame(2)), PushOutcome::Rejected);
assert_eq!(q.len(), 2);
let shutdown = AtomicBool::new(false);
let PopResult::Frame(f, _hold) = q.pop(&shutdown, None) else {
panic!("expected frame");
};
assert_eq!(f.seq(), 0);
}
#[test]
fn close_wakes_consumer() {
let q = std::sync::Arc::new(FrameQueue::new(BackpressurePolicy::default()));
let q2 = q.clone();
let shutdown = std::sync::Arc::new(AtomicBool::new(false));
let sd = shutdown.clone();
let handle = std::thread::spawn(move || q2.pop(&sd, None));
std::thread::sleep(Duration::from_millis(50));
q.close();
let result = handle.join().unwrap();
assert!(
matches!(result, PopResult::Closed),
"pop should return Closed after close"
);
}
#[test]
fn shutdown_wakes_consumer() {
let q = std::sync::Arc::new(FrameQueue::new(BackpressurePolicy::default()));
let q2 = q.clone();
let shutdown = std::sync::Arc::new(AtomicBool::new(false));
let sd = shutdown.clone();
let handle = std::thread::spawn(move || q2.pop(&sd, None));
std::thread::sleep(Duration::from_millis(50));
shutdown.store(true, Ordering::Relaxed);
q.wake_consumer();
let result = handle.join().unwrap();
assert!(
matches!(result, PopResult::Closed),
"pop should return Closed after shutdown"
);
}
#[test]
fn block_policy_waits_for_space() {
let q = std::sync::Arc::new(FrameQueue::new(BackpressurePolicy::Block {
queue_depth: 1,
}));
let q2 = q.clone();
assert_eq!(q.push(test_frame(0)), PushOutcome::Accepted);
let handle = std::thread::spawn(move || q2.push(test_frame(1)));
std::thread::sleep(Duration::from_millis(50));
assert_eq!(q.len(), 1);
let shutdown = AtomicBool::new(false);
let PopResult::Frame(_, _) = q.pop(&shutdown, None) else {
panic!("expected frame");
};
let outcome = handle.join().unwrap();
assert_eq!(outcome, PushOutcome::Accepted);
}
#[test]
fn pop_with_deadline_returns_timeout() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 4 });
let shutdown = AtomicBool::new(false);
let deadline = Instant::now() + Duration::from_millis(10);
let result = q.pop(&shutdown, Some(deadline));
assert!(
matches!(result, PopResult::Timeout),
"expected Timeout on empty queue with deadline"
);
}
#[test]
fn push_after_close_is_rejected() {
let q = FrameQueue::new(BackpressurePolicy::default());
q.close();
assert_eq!(q.push(test_frame(0)), PushOutcome::Rejected);
}
#[test]
fn depth_and_capacity() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 4 });
assert_eq!(q.capacity(), 4);
assert_eq!(q.depth(), 0);
q.push(test_frame(0));
q.push(test_frame(1));
assert_eq!(q.depth(), 2);
let shutdown = AtomicBool::new(false);
let _ = q.pop(&shutdown, None);
assert_eq!(q.depth(), 1);
let _ = q.pop(&shutdown, None);
assert_eq!(q.depth(), 0);
}
#[test]
fn depth_under_backpressure() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 2 });
q.push(test_frame(0));
q.push(test_frame(1));
assert_eq!(q.depth(), 2);
q.push(test_frame(2));
assert_eq!(q.depth(), 2);
assert_eq!(q.capacity(), 2);
}
#[test]
fn depth_returns_zero_after_close() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 4 });
q.push(test_frame(0));
q.push(test_frame(1));
q.close();
assert_eq!(q.depth(), 2);
let shutdown = AtomicBool::new(false);
let _ = q.pop(&shutdown, None);
let _ = q.pop(&shutdown, None);
assert_eq!(q.depth(), 0);
}
#[test]
fn pop_returns_nonzero_hold_time() {
let q = FrameQueue::new(BackpressurePolicy::DropOldest { queue_depth: 4 });
q.push(test_frame(0));
std::thread::sleep(Duration::from_millis(5));
let shutdown = AtomicBool::new(false);
let PopResult::Frame(_f, hold) = q.pop(&shutdown, None) else {
panic!("expected frame");
};
assert!(
hold >= Duration::from_millis(4),
"hold time should be >= 4ms, got {hold:?}"
);
}
}