use crate::error::Result;
use crossbeam_channel::{bounded, Receiver};
use scirs2_core::ndarray::Array2;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct Frame {
pub data: Array2<f32>,
pub timestamp: Instant,
pub index: usize,
pub metadata: Option<FrameMetadata>,
}
#[derive(Clone, Debug)]
pub struct FrameMetadata {
pub width: u32,
pub height: u32,
pub fps: f32,
pub channels: u8,
}
pub trait ProcessingStage: Send + 'static {
fn process(&mut self, frame: Frame) -> Result<Frame>;
fn name(&self) -> &str;
}
pub struct StreamPipeline {
pub(crate) stages: Vec<Box<dyn ProcessingStage>>,
pub(crate) buffer_size: usize,
pub(crate) num_threads: usize,
pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
}
#[derive(Default, Clone)]
pub struct PipelineMetrics {
pub frames_processed: usize,
pub avg_processing_time: Duration,
pub peak_processing_time: Duration,
pub fps: f32,
pub dropped_frames: usize,
}
impl Default for StreamPipeline {
fn default() -> Self {
Self::new()
}
}
impl StreamPipeline {
pub fn new() -> Self {
Self {
stages: Vec::new(),
buffer_size: 10,
num_threads: num_cpus::get(),
metrics: Arc::new(Mutex::new(PipelineMetrics::default())),
}
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn with_num_threads(mut self, threads: usize) -> Self {
self.num_threads = threads;
self
}
pub fn add_stage<S: ProcessingStage>(mut self, stage: S) -> Self {
self.stages.push(Box::new(stage));
self
}
pub fn process_stream<I>(&mut self, input: I) -> StreamProcessor
where
I: Iterator<Item = Frame> + Send + 'static,
{
let (tx, rx) = bounded(self.buffer_size);
let metrics = Arc::clone(&self.metrics);
let mut channels = vec![rx];
for stage in self.stages.drain(..) {
let (stage_tx, stage_rx) = bounded(self.buffer_size);
channels.push(stage_rx);
let stage_metrics = Arc::clone(&metrics);
let stagename = stage.name().to_string();
let prev_rx = channels[channels.len() - 2].clone();
thread::spawn(move || {
let mut stage = stage;
while let Ok(frame) = prev_rx.recv() {
let start = Instant::now();
match stage.process(frame) {
Ok(processed) => {
let duration = start.elapsed();
if let Ok(mut m) = stage_metrics.lock() {
m.frames_processed += 1;
m.avg_processing_time = Duration::from_secs_f64(
(m.avg_processing_time.as_secs_f64()
* (m.frames_processed - 1) as f64
+ duration.as_secs_f64())
/ m.frames_processed as f64,
);
if duration > m.peak_processing_time {
m.peak_processing_time = duration;
}
}
if stage_tx.send(processed).is_err() {
break;
}
}
Err(e) => {
eprintln!("Stage {stagename} error: {e}");
if let Ok(mut m) = stage_metrics.lock() {
m.dropped_frames += 1;
}
}
}
}
});
}
let output_rx = channels.pop().expect("Operation failed");
thread::spawn(move || {
for frame in input {
if tx.send(frame).is_err() {
break;
}
}
});
StreamProcessor {
output: output_rx,
metrics,
}
}
pub fn metrics(&self) -> PipelineMetrics {
self.metrics.lock().expect("Operation failed").clone()
}
}
pub struct StreamProcessor {
pub(crate) output: Receiver<Frame>,
pub(crate) metrics: Arc<Mutex<PipelineMetrics>>,
}
impl StreamProcessor {
pub fn next(&self) -> Option<Frame> {
self.output.recv().ok()
}
pub fn try_next(&self) -> Option<Frame> {
self.output.try_recv().ok()
}
pub fn metrics(&self) -> PipelineMetrics {
self.metrics.lock().expect("Operation failed").clone()
}
}
impl Iterator for StreamProcessor {
type Item = Frame;
fn next(&mut self) -> Option<Self::Item> {
self.output.recv().ok()
}
}