1use anyhow::{Context, Result};
17use std::sync::Arc;
18
19use codec::colorspace;
20use codec::frame::VideoFrame;
21
22use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
23
24#[derive(Clone)]
25pub struct RungScalerConfig {
26 pub rung_idx: usize,
27 pub target_width: u32,
28 pub target_height: u32,
29 pub frames_per_chunk: u32,
31}
32
33pub fn run_rung_scaler_blocking(
36 cfg: RungScalerConfig,
37 mut frame_rx: tokio::sync::mpsc::Receiver<VideoFrame>,
38 queue: Arc<SegmentChunkQueue>,
39 rt: tokio::runtime::Handle,
40) -> Result<usize> {
41 let outcome = scaler_loop(&cfg, &mut frame_rx, &queue, &rt);
42 queue.close();
44 outcome
45}
46
47fn scaler_loop(
48 cfg: &RungScalerConfig,
49 frame_rx: &mut tokio::sync::mpsc::Receiver<VideoFrame>,
50 queue: &Arc<SegmentChunkQueue>,
51 rt: &tokio::runtime::Handle,
52) -> Result<usize> {
53 let chunk_size = cfg.frames_per_chunk as usize;
54 assert!(chunk_size > 0, "frames_per_chunk must be > 0");
55
56 let mut current_chunk: Vec<VideoFrame> = Vec::with_capacity(chunk_size);
57 let mut next_segment_idx: usize = 0;
58 let mut pushed_segments: usize = 0;
59 let mut producer_aborted = false;
60
61 let emit = |chunk_frames: Vec<VideoFrame>, idx: usize, is_final: bool| -> Result<bool> {
62 let chunk = SegmentChunk {
63 segment_idx: idx,
64 frames: chunk_frames,
65 is_final,
66 };
67 let q = Arc::clone(queue);
68 let accepted = rt.block_on(async move { q.push(chunk).await });
69 Ok(accepted)
70 };
71
72 loop {
73 let frame = match rt.block_on(frame_rx.recv()) {
74 Some(f) => f,
75 None => break,
76 };
77 let scaled = colorspace::scale_frame(&frame, cfg.target_width, cfg.target_height)
78 .with_context(|| {
79 format!(
80 "rung {} scaler: scale_frame to {}×{}",
81 cfg.rung_idx, cfg.target_width, cfg.target_height
82 )
83 })?;
84 current_chunk.push(scaled);
85 if current_chunk.len() >= chunk_size {
86 let full = std::mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
87 let idx = next_segment_idx;
88 next_segment_idx += 1;
89 if !emit(full, idx, false)? {
90 producer_aborted = true;
91 break;
92 }
93 pushed_segments += 1;
94 }
95 }
96
97 if !producer_aborted && !current_chunk.is_empty() {
98 let idx = next_segment_idx;
99 if emit(current_chunk, idx, true)? {
100 pushed_segments += 1;
101 }
102 }
103
104 Ok(pushed_segments)
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110
111 #[test]
112 fn config_clone_preserves_fields() {
113 let cfg = RungScalerConfig {
114 rung_idx: 1,
115 target_width: 1280,
116 target_height: 720,
117 frames_per_chunk: 60,
118 };
119 let copy = cfg.clone();
120 assert_eq!(copy.rung_idx, 1);
121 assert_eq!(copy.frames_per_chunk, 60);
122 }
123}