Skip to main content

rivet/
decode_pump.rs

1//! Shared source decode pump.
2//!
3//! One pump per job (not per rung): demux + decode the source **once**, run
4//! the rung-agnostic per-frame work (4:4:4 → 4:2:0 downsample + HDR tonemap),
5//! and fan the normalized frame out to N per-rung mpsc channels via cheap
6//! `VideoFrame::clone()` (the inner `Bytes` is `Arc`-backed).
7//!
8//! Per-rung scaling + encoding consume from those channels. Eliminating the
9//! redundant per-rung decode is the whole point — a 5-rung ladder decodes the
10//! source once, not five times. The cost: the slowest rung backpressures the
11//! pump (usually the largest rung, whose encoder is slowest).
12
13use anyhow::{Context, Result};
14use bytes::Bytes;
15
16use codec::frame::{ColorMetadata, PixelFormat, VideoFrame};
17use codec::{colorspace, decode};
18use container::streaming;
19
20/// Configuration for one decode pump.
21#[derive(Clone)]
22pub struct DecodePumpConfig {
23    /// Source video codec label (e.g. `"h264"`).
24    pub codec_name: String,
25    /// Stream info handed to the decoder.
26    pub info_for_decoder: codec::frame::StreamInfo,
27    /// Source color metadata (drives HDR-aware tonemap vs SDR passthrough).
28    pub source_color_metadata: ColorMetadata,
29    /// Source pixel format.
30    pub source_pixel_format: PixelFormat,
31    /// Whether to run the 4:4:4 → 4:2:0 downsample per frame.
32    pub needs_downsample: bool,
33    /// Tonemap policy (from the [`OutputSpec`](crate::spec::OutputSpec)): when
34    /// `true`, HDR (PQ/HLG) sources are mapped down to 8-bit SDR BT.709; when
35    /// `false`, the source color/transfer/bit-depth passes through unchanged.
36    /// The pump does not decide this on its own — the caller sets it from the
37    /// spec's [`ColorPolicy`](crate::spec::ColorPolicy).
38    pub tonemap_to_sdr: bool,
39    /// Pin the decoder to this physical GPU; `None` = first matching adapter.
40    pub gpu_index: Option<u32>,
41    /// Prepared per-frame video filter chain (crop/pad/flip/rotate/grayscale/
42    /// overlay/colour), applied after colorspace normalize and before the frame
43    /// is fanned out to the per-rung scalers. Overlay images are loaded once at
44    /// prepare time. `Arc` so the per-GPU pump configs clone it cheaply.
45    pub filters: std::sync::Arc<codec::filter::FilterChain>,
46}
47
48/// Blocking decode loop, designed for `tokio::task::spawn_blocking`. Fans
49/// every normalized frame out to all `senders`. If a sender's channel is
50/// closed (its rung gave up) the pump continues with the rest; it stops only
51/// when *every* sender is closed. `rt` bridges into the async `send().await`.
52///
53/// Returns the number of frames pushed.
54pub fn run_shared_decode_pump_blocking(
55    cfg: DecodePumpConfig,
56    input_data: Bytes,
57    senders: Vec<tokio::sync::mpsc::Sender<VideoFrame>>,
58    rt: tokio::runtime::Handle,
59) -> Result<u64> {
60    let outcome = decode_loop(&cfg, input_data, &senders, &rt);
61    // Drop senders so receivers wake and exit.
62    drop(senders);
63    outcome
64}
65
66fn decode_loop(
67    cfg: &DecodePumpConfig,
68    input_data: Bytes,
69    senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
70    rt: &tokio::runtime::Handle,
71) -> Result<u64> {
72    let mut demuxer =
73        streaming::demux_streaming(&input_data).context("demuxing input for shared decode pump")?;
74    let mut decoder =
75        decode::create_decoder_on(&cfg.codec_name, cfg.info_for_decoder.clone(), cfg.gpu_index)
76            .context("creating decoder for shared decode pump")?;
77
78    let mut frames_pushed: u64 = 0;
79    'outer: loop {
80        match demuxer
81            .next_video_sample()
82            .context("demuxing next video sample in shared decode pump")?
83        {
84            Some(sample) => {
85                decoder
86                    .push_sample(&sample.data)
87                    .context("pushing sample to shared decode pump decoder")?;
88                while let Some(frame) = decoder
89                    .decode_next()
90                    .context("decoding frame in shared decode pump")?
91                {
92                    let normalized = normalize_frame(cfg, frame)?;
93                    if !fan_out(senders, normalized, rt)? {
94                        break 'outer;
95                    }
96                    frames_pushed += 1;
97                }
98            }
99            None => {
100                decoder
101                    .finish()
102                    .context("decoder finish in shared decode pump")?;
103                while let Some(frame) = decoder
104                    .decode_next()
105                    .context("decoding frame after finish in shared decode pump")?
106                {
107                    let normalized = normalize_frame(cfg, frame)?;
108                    if !fan_out(senders, normalized, rt)? {
109                        break;
110                    }
111                    frames_pushed += 1;
112                }
113                break;
114            }
115        }
116    }
117
118    Ok(frames_pushed)
119}
120
121/// Rung-agnostic per-frame work: 4:4:4 → 4:2:0 downsample (if needed) then,
122/// when the spec's color policy asks for it (`tonemap_to_sdr`), an HDR-aware
123/// colorspace convert (tonemap PQ/HLG → SDR BT.709, identity for SDR). When the
124/// policy is passthrough/HDR, the downsampled source is forwarded unchanged.
125/// Per-rung scaling is NOT done here.
126fn normalize_frame(cfg: &DecodePumpConfig, frame: VideoFrame) -> Result<VideoFrame> {
127    let downsampled = if cfg.needs_downsample {
128        colorspace::downsample_444_to_420_frame(&frame)
129            .context("shared decode pump 4:4:4 → 4:2:0 downsample")?
130    } else {
131        frame
132    };
133    let normalized = if !cfg.tonemap_to_sdr {
134        // Passthrough / HDR output: preserve the source color + bit depth.
135        downsampled
136    } else {
137        colorspace::convert_to_sdr_bt709(&downsampled, &cfg.source_color_metadata)
138            .context("shared decode pump colorspace convert (HDR-aware)")?
139    };
140    // Video filters (crop/pad/flip/rotate/grayscale/overlay/colour) run on the
141    // normalized 4:2:0 frame, before the per-rung scalers see it.
142    if cfg.filters.is_empty() {
143        Ok(normalized)
144    } else {
145        cfg.filters.apply(normalized).context("shared decode pump video filters")
146    }
147}
148
149/// Fan one frame out to every sender. Cloning `VideoFrame` is cheap (inner
150/// `Bytes` is `Arc`-backed). Returns `false` only if EVERY sender is closed.
151fn fan_out(
152    senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
153    frame: VideoFrame,
154    rt: &tokio::runtime::Handle,
155) -> Result<bool> {
156    let mut any_alive = false;
157    for (idx, sender) in senders.iter().enumerate() {
158        let frame_clone = frame.clone();
159        let sender = sender.clone();
160        let accepted = rt.block_on(async move { sender.send(frame_clone).await });
161        match accepted {
162            Ok(()) => any_alive = true,
163            Err(_) => {
164                tracing::warn!(rung_idx = idx, "shared decode pump: rung dropped its receiver");
165            }
166        }
167    }
168    Ok(any_alive)
169}