Skip to main content

rivet/
rung_scaler.rs

1//! Per-rung scaler task: consume raw normalized source frames from
2//! the shared decode pump, scale to rung dims, group K frames into a
3//! `SegmentChunk`, push to the rung's `SegmentChunkQueue`.
4//!
5//! v3 multi-GPU model (2026-05-12): one scaler per rung sits between
6//! the shared pump and that rung's encoder workers. The pump fans
7//! frames out to every scaler's input channel; each scaler does its
8//! own bilinear scale (CPU work) and chunks the result so workers
9//! see one chunk per CMAF segment.
10//!
11//! Scalers exit when their input channel returns `None` (pump closed
12//! all senders). On exit, the scaler flushes any in-progress chunk
13//! (final partial segment) and closes the queue so encoder workers
14//! drain and exit cleanly.
15
16use anyhow::{Context, Result};
17use std::sync::Arc;
18
19use codec::colorspace;
20use codec::frame::VideoFrame;
21
22use crate::frame_queue::{SegmentChunk, SegmentChunkQueue};
23
24#[derive(Clone)]
25pub struct RungScalerConfig {
26    pub rung_idx: usize,
27    pub target_width: u32,
28    pub target_height: u32,
29    /// Frames per segment chunk. Equal to encoder's keyframe_interval.
30    pub frames_per_chunk: u32,
31}
32
33/// Blocking scaler loop. Designed for `tokio::task::spawn_blocking`.
34/// Returns the total number of segment chunks pushed.
35pub fn run_rung_scaler_blocking(
36    cfg: RungScalerConfig,
37    mut frame_rx: tokio::sync::mpsc::Receiver<VideoFrame>,
38    queue: Arc<SegmentChunkQueue>,
39    rt: tokio::runtime::Handle,
40) -> Result<usize> {
41    let outcome = scaler_loop(&cfg, &mut frame_rx, &queue, &rt);
42    // Always close the queue on exit so encoder workers wake + exit.
43    queue.close();
44    outcome
45}
46
47fn scaler_loop(
48    cfg: &RungScalerConfig,
49    frame_rx: &mut tokio::sync::mpsc::Receiver<VideoFrame>,
50    queue: &Arc<SegmentChunkQueue>,
51    rt: &tokio::runtime::Handle,
52) -> Result<usize> {
53    let chunk_size = cfg.frames_per_chunk as usize;
54    assert!(chunk_size > 0, "frames_per_chunk must be > 0");
55
56    let mut current_chunk: Vec<VideoFrame> = Vec::with_capacity(chunk_size);
57    let mut next_segment_idx: usize = 0;
58    let mut pushed_segments: usize = 0;
59    let mut producer_aborted = false;
60
61    let emit = |chunk_frames: Vec<VideoFrame>, idx: usize, is_final: bool| -> Result<bool> {
62        let chunk = SegmentChunk {
63            segment_idx: idx,
64            frames: chunk_frames,
65            is_final,
66        };
67        let q = Arc::clone(queue);
68        let accepted = rt.block_on(async move { q.push(chunk).await });
69        Ok(accepted)
70    };
71
72    loop {
73        let frame = match rt.block_on(frame_rx.recv()) {
74            Some(f) => f,
75            None => break,
76        };
77        let scaled = colorspace::scale_frame(&frame, cfg.target_width, cfg.target_height)
78            .with_context(|| {
79                format!(
80                    "rung {} scaler: scale_frame to {}×{}",
81                    cfg.rung_idx, cfg.target_width, cfg.target_height
82                )
83            })?;
84        current_chunk.push(scaled);
85        if current_chunk.len() >= chunk_size {
86            let full = std::mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
87            let idx = next_segment_idx;
88            next_segment_idx += 1;
89            if !emit(full, idx, false)? {
90                producer_aborted = true;
91                break;
92            }
93            pushed_segments += 1;
94        }
95    }
96
97    if !producer_aborted && !current_chunk.is_empty() {
98        let idx = next_segment_idx;
99        if emit(current_chunk, idx, true)? {
100            pushed_segments += 1;
101        }
102    }
103
104    Ok(pushed_segments)
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn config_clone_preserves_fields() {
113        let cfg = RungScalerConfig {
114            rung_idx: 1,
115            target_width: 1280,
116            target_height: 720,
117            frames_per_chunk: 60,
118        };
119        let copy = cfg.clone();
120        assert_eq!(copy.rung_idx, 1);
121        assert_eq!(copy.frames_per_chunk, 60);
122    }
123}