use anyhow::{Context, Result};
use std::sync::Arc;
use codec::colorspace;
use codec::frame::VideoFrame;
use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
#[derive(Clone)]
pub struct RungScalerConfig {
pub rung_idx: usize,
pub target_width: u32,
pub target_height: u32,
pub frames_per_chunk: u32,
}
pub fn run_rung_scaler_blocking(
cfg: RungScalerConfig,
mut frame_rx: tokio::sync::mpsc::Receiver<VideoFrame>,
queue: Arc<SegmentChunkQueue>,
rt: tokio::runtime::Handle,
) -> Result<usize> {
let outcome = scaler_loop(&cfg, &mut frame_rx, &queue, &rt);
queue.close();
outcome
}
fn scaler_loop(
cfg: &RungScalerConfig,
frame_rx: &mut tokio::sync::mpsc::Receiver<VideoFrame>,
queue: &Arc<SegmentChunkQueue>,
rt: &tokio::runtime::Handle,
) -> Result<usize> {
let chunk_size = cfg.frames_per_chunk as usize;
assert!(chunk_size > 0, "frames_per_chunk must be > 0");
let mut current_chunk: Vec<VideoFrame> = Vec::with_capacity(chunk_size);
let mut next_segment_idx: usize = 0;
let mut pushed_segments: usize = 0;
let mut producer_aborted = false;
let emit = |chunk_frames: Vec<VideoFrame>, idx: usize, is_final: bool| -> Result<bool> {
let chunk = SegmentChunk {
segment_idx: idx,
frames: chunk_frames,
is_final,
};
let q = Arc::clone(queue);
let accepted = rt.block_on(async move { q.push(chunk).await });
Ok(accepted)
};
loop {
let frame = match rt.block_on(frame_rx.recv()) {
Some(f) => f,
None => break,
};
let scaled = colorspace::scale_frame(&frame, cfg.target_width, cfg.target_height)
.with_context(|| {
format!(
"rung {} scaler: scale_frame to {}×{}",
cfg.rung_idx, cfg.target_width, cfg.target_height
)
})?;
current_chunk.push(scaled);
if current_chunk.len() >= chunk_size {
let full = std::mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
let idx = next_segment_idx;
next_segment_idx += 1;
if !emit(full, idx, false)? {
producer_aborted = true;
break;
}
pushed_segments += 1;
}
}
if !producer_aborted && !current_chunk.is_empty() {
let idx = next_segment_idx;
if emit(current_chunk, idx, true)? {
pushed_segments += 1;
}
}
Ok(pushed_segments)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_clone_preserves_fields() {
let cfg = RungScalerConfig {
rung_idx: 1,
target_width: 1280,
target_height: 720,
frames_per_chunk: 60,
};
let copy = cfg.clone();
assert_eq!(copy.rung_idx, 1);
assert_eq!(copy.frames_per_chunk, 60);
}
}