use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PipelineBBox {
pub x1: f32,
pub y1: f32,
pub x2: f32,
pub y2: f32,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PipelineDetection {
pub bbox: PipelineBBox,
pub score: f32,
pub class_id: usize,
}
const SLOT_FREE: u8 = 0;
const SLOT_WRITING: u8 = 1;
const SLOT_CAPTURED: u8 = 2;
const SLOT_PROCESSING: u8 = 3;
const SLOT_READY: u8 = 4;
struct SlotPayload {
data: Vec<u8>,
width: u32,
height: u32,
pixel_format: u8,
timestamp_us: u64,
detections: Vec<PipelineDetection>,
}
pub struct FrameSlot {
payload: UnsafeCell<SlotPayload>,
state: AtomicU8,
}
unsafe impl Sync for FrameSlot {}
pub struct SlotRef<'a> {
payload: &'a SlotPayload,
}
impl SlotRef<'_> {
pub fn data(&self) -> &[u8] {
&self.payload.data
}
pub fn width(&self) -> u32 {
self.payload.width
}
pub fn height(&self) -> u32 {
self.payload.height
}
pub fn pixel_format(&self) -> u8 {
self.payload.pixel_format
}
pub fn timestamp_us(&self) -> u64 {
self.payload.timestamp_us
}
pub fn detections(&self) -> &[PipelineDetection] {
&self.payload.detections
}
}
pub struct SlotMut<'a> {
payload: &'a mut SlotPayload,
}
impl SlotMut<'_> {
pub fn data_mut(&mut self) -> &mut Vec<u8> {
&mut self.payload.data
}
pub fn data(&self) -> &[u8] {
&self.payload.data
}
pub fn set_width(&mut self, w: u32) {
self.payload.width = w;
}
pub fn set_height(&mut self, h: u32) {
self.payload.height = h;
}
pub fn set_pixel_format(&mut self, fmt: u8) {
self.payload.pixel_format = fmt;
}
pub fn set_timestamp_us(&mut self, ts: u64) {
self.payload.timestamp_us = ts;
}
pub fn detections_mut(&mut self) -> &mut Vec<PipelineDetection> {
&mut self.payload.detections
}
pub fn detections(&self) -> &[PipelineDetection] {
&self.payload.detections
}
pub fn width(&self) -> u32 {
self.payload.width
}
pub fn height(&self) -> u32 {
self.payload.height
}
pub fn pixel_format(&self) -> u8 {
self.payload.pixel_format
}
pub fn timestamp_us(&self) -> u64 {
self.payload.timestamp_us
}
}
pub struct FramePipeline {
slots: Vec<FrameSlot>,
capacity: usize,
write_pos: AtomicUsize,
read_pos: AtomicUsize,
output_pos: AtomicUsize,
}
unsafe impl Sync for FramePipeline {}
impl FramePipeline {
pub fn new(capacity: usize, max_frame_bytes: usize) -> Self {
assert!(capacity > 0, "FramePipeline capacity must be > 0");
let slots: Vec<FrameSlot> = (0..capacity)
.map(|_| FrameSlot {
payload: UnsafeCell::new(SlotPayload {
data: vec![0u8; max_frame_bytes],
width: 0,
height: 0,
pixel_format: 0,
timestamp_us: 0,
detections: Vec::new(),
}),
state: AtomicU8::new(SLOT_FREE),
})
.collect();
Self {
slots,
capacity,
write_pos: AtomicUsize::new(0),
read_pos: AtomicUsize::new(0),
output_pos: AtomicUsize::new(0),
}
}
pub fn try_acquire_write(&self) -> Option<SlotMut<'_>> {
let idx = self.write_pos.load(Ordering::Relaxed) % self.capacity;
let slot = &self.slots[idx];
if slot
.state
.compare_exchange(
SLOT_FREE,
SLOT_WRITING,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let payload = unsafe { &mut *slot.payload.get() };
Some(SlotMut { payload })
} else {
None
}
}
pub fn commit_write(&self) {
let idx = self.write_pos.load(Ordering::Relaxed) % self.capacity;
self.slots[idx]
.state
.store(SLOT_CAPTURED, Ordering::Release);
self.write_pos.fetch_add(1, Ordering::Relaxed);
}
pub fn rollback_write(&self) {
let idx = self.write_pos.load(Ordering::Relaxed) % self.capacity;
self.slots[idx].state.store(SLOT_FREE, Ordering::Release);
}
pub fn try_acquire_read(&self) -> Option<SlotMut<'_>> {
let idx = self.read_pos.load(Ordering::Relaxed) % self.capacity;
let slot = &self.slots[idx];
if slot
.state
.compare_exchange(
SLOT_CAPTURED,
SLOT_PROCESSING,
Ordering::Acquire,
Ordering::Relaxed,
)
.is_ok()
{
let payload = unsafe { &mut *slot.payload.get() };
Some(SlotMut { payload })
} else {
None
}
}
pub fn commit_read(&self) {
let idx = self.read_pos.load(Ordering::Relaxed) % self.capacity;
self.slots[idx].state.store(SLOT_READY, Ordering::Release);
self.read_pos.fetch_add(1, Ordering::Relaxed);
}
pub fn try_acquire_output(&self) -> Option<SlotRef<'_>> {
let idx = self.output_pos.load(Ordering::Relaxed) % self.capacity;
let slot = &self.slots[idx];
if slot.state.load(Ordering::Acquire) == SLOT_READY {
let payload = unsafe { &*slot.payload.get() };
Some(SlotRef { payload })
} else {
None
}
}
pub fn commit_output(&self) {
let idx = self.output_pos.load(Ordering::Relaxed) % self.capacity;
self.slots[idx].state.store(SLOT_FREE, Ordering::Release);
self.output_pos.fetch_add(1, Ordering::Relaxed);
}
pub fn capacity(&self) -> usize {
self.capacity
}
}
pub fn run_pipeline<C, P, O>(
pipeline: &FramePipeline,
capture: C,
process: P,
output: O,
max_frames: usize,
) -> PipelineStats
where
C: FnMut(&mut SlotMut<'_>) -> bool + Send,
P: FnMut(&mut SlotMut<'_>) + Send,
O: FnMut(&SlotRef<'_>) + Send,
{
let capture_done = AtomicBool::new(false);
let captured_count = AtomicUsize::new(0);
let processed_count = AtomicUsize::new(0);
let outputted_count = AtomicUsize::new(0);
let panic_capture = AtomicUsize::new(0);
let panic_process = AtomicUsize::new(0);
let panic_output = AtomicUsize::new(0);
let capture = std::sync::Mutex::new(capture);
let process = std::sync::Mutex::new(process);
let output = std::sync::Mutex::new(output);
std::thread::scope(|s| {
s.spawn(|| {
let mut produced = 0usize;
while produced < max_frames {
if let Some(mut slot) = pipeline.try_acquire_write() {
let keep_going = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut cap = capture.lock().unwrap_or_else(|e| e.into_inner());
(*cap)(&mut slot)
}));
match keep_going {
Ok(true) => {
pipeline.commit_write();
produced += 1;
captured_count.store(produced, Ordering::Release);
}
Ok(false) => {
pipeline.rollback_write();
break;
}
Err(payload) => {
let msg = panic_message(&payload);
eprintln!("[yscv-video] capture stage panicked: {msg} — stopping");
panic_capture.fetch_add(1, Ordering::Relaxed);
pipeline.rollback_write();
break; }
}
} else {
std::hint::spin_loop();
}
}
capture_done.store(true, Ordering::Release);
});
s.spawn(|| {
let mut done = 0usize;
loop {
if let Some(mut slot) = pipeline.try_acquire_read() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut proc = process.lock().unwrap_or_else(|e| e.into_inner());
(*proc)(&mut slot);
}));
if let Err(payload) = result {
let msg = panic_message(&payload);
eprintln!(
"[yscv-video] process stage panicked on frame {done}: {msg} — continuing with empty detections"
);
panic_process.fetch_add(1, Ordering::Relaxed);
slot.detections_mut().clear();
}
pipeline.commit_read();
done += 1;
processed_count.store(done, Ordering::Release);
} else if capture_done.load(Ordering::Acquire)
&& done >= captured_count.load(Ordering::Acquire)
{
return;
} else {
std::hint::spin_loop();
}
}
});
s.spawn(|| {
let mut done = 0usize;
loop {
if let Some(slot_ref) = pipeline.try_acquire_output() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut out = output.lock().unwrap_or_else(|e| e.into_inner());
(*out)(&slot_ref);
}));
if let Err(payload) = result {
let msg = panic_message(&payload);
eprintln!(
"[yscv-video] output stage panicked on frame {done}: {msg} — frame dropped"
);
panic_output.fetch_add(1, Ordering::Relaxed);
}
pipeline.commit_output();
done += 1;
outputted_count.store(done, Ordering::Release);
} else if capture_done.load(Ordering::Acquire)
&& done >= captured_count.load(Ordering::Acquire)
{
return;
} else {
std::hint::spin_loop();
}
}
});
});
PipelineStats {
captured: captured_count.load(Ordering::Relaxed),
processed: processed_count.load(Ordering::Relaxed),
outputted: outputted_count.load(Ordering::Relaxed),
panics_capture: panic_capture.load(Ordering::Relaxed),
panics_process: panic_process.load(Ordering::Relaxed),
panics_output: panic_output.load(Ordering::Relaxed),
capture_latency: crate::latency_histogram::LatencyHistogram::new(),
process_latency: crate::latency_histogram::LatencyHistogram::new(),
output_latency: crate::latency_histogram::LatencyHistogram::new(),
}
}
#[derive(Debug, Default)]
pub struct PipelineStats {
pub captured: usize,
pub processed: usize,
pub outputted: usize,
pub panics_capture: usize,
pub panics_process: usize,
pub panics_output: usize,
pub capture_latency: crate::latency_histogram::LatencyHistogram,
pub process_latency: crate::latency_histogram::LatencyHistogram,
pub output_latency: crate::latency_histogram::LatencyHistogram,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PipelineLatencySnapshot {
pub capture: crate::latency_histogram::LatencyQuantiles,
pub process: crate::latency_histogram::LatencyQuantiles,
pub output: crate::latency_histogram::LatencyQuantiles,
}
impl PipelineStats {
pub fn latency_snapshot(&self) -> PipelineLatencySnapshot {
PipelineLatencySnapshot {
capture: self.capture_latency.snapshot(),
process: self.process_latency.snapshot(),
output: self.output_latency.snapshot(),
}
}
}
fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> &str {
if let Some(s) = payload.downcast_ref::<&'static str>() {
s
} else if let Some(s) = payload.downcast_ref::<String>() {
s.as_str()
} else {
"<non-string panic payload>"
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn pipeline_basic() {
let pipeline = FramePipeline::new(4, 16);
let total = 10usize;
let capture_idx = AtomicUsize::new(0);
let output_count = AtomicUsize::new(0);
run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
slot.set_timestamp_us(i as u64);
slot.data_mut()[0] = i as u8;
true
},
|slot: &mut SlotMut<'_>| {
let v = slot.data()[0];
slot.data_mut()[0] = v.wrapping_mul(2);
},
|slot: &SlotRef<'_>| {
let ts = slot.timestamp_us() as u8;
let processed_val = slot.data()[0];
assert_eq!(processed_val, ts.wrapping_mul(2));
output_count.fetch_add(1, Ordering::Relaxed);
},
total,
);
assert_eq!(output_count.load(Ordering::Relaxed), total);
}
#[test]
fn pipeline_zero_alloc() {
let max_bytes = 128;
let pipeline = FramePipeline::new(4, max_bytes);
let initial_data_caps: Vec<usize> = pipeline
.slots
.iter()
.map(|slot| {
let p = unsafe { &*slot.payload.get() };
p.data.capacity()
})
.collect();
let capture_idx = AtomicUsize::new(0);
let total = 20usize;
run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
let len = slot.data().len().min(max_bytes);
slot.data_mut()[..len]
.iter_mut()
.enumerate()
.for_each(|(j, b)| *b = (i + j) as u8);
slot.set_timestamp_us(i as u64);
true
},
|slot: &mut SlotMut<'_>| {
slot.detections_mut().clear();
},
|_slot: &SlotRef<'_>| {},
total,
);
let final_data_caps: Vec<usize> = pipeline
.slots
.iter()
.map(|slot| {
let p = unsafe { &*slot.payload.get() };
p.data.capacity()
})
.collect();
assert_eq!(
initial_data_caps, final_data_caps,
"data buffers must not reallocate during steady-state"
);
}
#[test]
fn pipeline_concurrent() {
let pipeline = FramePipeline::new(8, 256);
let total = 100usize;
let capture_idx = AtomicUsize::new(0);
let output_sum = AtomicUsize::new(0);
run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
slot.set_timestamp_us(i as u64);
slot.data_mut()[0] = (i & 0xFF) as u8;
true
},
|slot: &mut SlotMut<'_>| {
let v = slot.data()[0];
slot.data_mut()[0] = v.wrapping_add(1);
},
|slot: &SlotRef<'_>| {
output_sum.fetch_add(slot.data()[0] as usize, Ordering::Relaxed);
},
total,
);
let expected: usize = (0..total).map(|i| (i & 0xFF) + 1).sum();
assert_eq!(output_sum.load(Ordering::Relaxed), expected);
}
#[test]
fn pipeline_early_stop() {
let pipeline = FramePipeline::new(4, 16);
let capture_idx = AtomicUsize::new(0);
let output_count = AtomicUsize::new(0);
let early_stop_at = 3usize;
run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= early_stop_at {
return false;
}
slot.set_timestamp_us(i as u64);
true
},
|_slot: &mut SlotMut<'_>| {},
|_slot: &SlotRef<'_>| {
output_count.fetch_add(1, Ordering::Relaxed);
},
1000, );
assert_eq!(output_count.load(Ordering::Relaxed), early_stop_at);
}
#[test]
fn pipeline_single_slot() {
let pipeline = FramePipeline::new(1, 8);
let total = 5usize;
let capture_idx = AtomicUsize::new(0);
let output_count = AtomicUsize::new(0);
run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
slot.set_timestamp_us(i as u64);
true
},
|_slot: &mut SlotMut<'_>| {},
|_slot: &SlotRef<'_>| {
output_count.fetch_add(1, Ordering::Relaxed);
},
total,
);
assert_eq!(output_count.load(Ordering::Relaxed), total);
}
#[test]
fn pipeline_survives_process_panic() {
let pipeline = FramePipeline::new(4, 16);
let total = 20usize;
let capture_idx = AtomicUsize::new(0);
let output_count = AtomicUsize::new(0);
let stats = run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
slot.set_timestamp_us(i as u64);
true
},
|slot: &mut SlotMut<'_>| {
let idx = slot.timestamp_us() as usize;
if idx % 5 == 4 {
panic!("intentional test panic at frame {idx}");
}
},
|_slot: &SlotRef<'_>| {
output_count.fetch_add(1, Ordering::Relaxed);
},
total,
);
assert_eq!(stats.captured, total);
assert_eq!(
stats.processed, total,
"all frames flow through despite panics"
);
assert_eq!(stats.outputted, total);
assert_eq!(stats.panics_process, 4, "frames 4, 9, 14, 19 panic");
assert_eq!(stats.panics_capture, 0);
assert_eq!(stats.panics_output, 0);
assert_eq!(output_count.load(Ordering::Relaxed), total);
}
#[test]
fn pipeline_survives_output_panic() {
let pipeline = FramePipeline::new(4, 16);
let total = 10usize;
let capture_idx = AtomicUsize::new(0);
let stats = run_pipeline(
&pipeline,
|slot: &mut SlotMut<'_>| {
let i = capture_idx.fetch_add(1, Ordering::Relaxed);
if i >= total {
return false;
}
slot.set_timestamp_us(i as u64);
true
},
|_slot: &mut SlotMut<'_>| {},
|slot: &SlotRef<'_>| {
if slot.timestamp_us() == 3 {
panic!("output panic at frame 3");
}
},
total,
);
assert_eq!(stats.captured, total);
assert_eq!(stats.processed, total);
assert_eq!(stats.outputted, total);
assert_eq!(stats.panics_output, 1);
}
}