Skip to main content

rivet/
decode_pump.rs

1//! Shared source decode pump.
2//!
3//! One pump per job (not per rung): demux + decode the source **once**, run
4//! the rung-agnostic per-frame work (4:4:4 → 4:2:0 downsample + HDR tonemap),
5//! and fan the normalized frame out to N per-rung mpsc channels via cheap
6//! `VideoFrame::clone()` (the inner `Bytes` is `Arc`-backed).
7//!
8//! Per-rung scaling + encoding consume from those channels. Eliminating the
9//! redundant per-rung decode is the whole point — a 5-rung ladder decodes the
10//! source once, not five times. The cost: the slowest rung backpressures the
11//! pump (usually the largest rung, whose encoder is slowest).
12
13use anyhow::{Context, Result};
14use bytes::Bytes;
15
16use codec::frame::{ColorMetadata, PixelFormat, VideoFrame};
17use codec::{colorspace, decode};
18use container::streaming;
19
20/// Configuration for one decode pump.
21#[derive(Clone)]
22pub struct DecodePumpConfig {
23    /// Source video codec label (e.g. `"h264"`).
24    pub codec_name: String,
25    /// Stream info handed to the decoder.
26    pub info_for_decoder: codec::frame::StreamInfo,
27    /// Source color metadata (drives HDR-aware tonemap vs SDR passthrough).
28    pub source_color_metadata: ColorMetadata,
29    /// Source pixel format.
30    pub source_pixel_format: PixelFormat,
31    /// Whether to run the 4:4:4 → 4:2:0 downsample per frame.
32    pub needs_downsample: bool,
33    /// Tonemap policy (from the [`OutputSpec`](crate::spec::OutputSpec)): when
34    /// `true`, HDR (PQ/HLG) sources are mapped down to 8-bit SDR BT.709; when
35    /// `false`, the source color/transfer/bit-depth passes through unchanged.
36    /// The pump does not decide this on its own — the caller sets it from the
37    /// spec's [`ColorPolicy`](crate::spec::ColorPolicy).
38    pub tonemap_to_sdr: bool,
39    /// Pin the decoder to this physical GPU; `None` = first matching adapter.
40    pub gpu_index: Option<u32>,
41}
42
43/// Blocking decode loop, designed for `tokio::task::spawn_blocking`. Fans
44/// every normalized frame out to all `senders`. If a sender's channel is
45/// closed (its rung gave up) the pump continues with the rest; it stops only
46/// when *every* sender is closed. `rt` bridges into the async `send().await`.
47///
48/// Returns the number of frames pushed.
49pub fn run_shared_decode_pump_blocking(
50    cfg: DecodePumpConfig,
51    input_data: Bytes,
52    senders: Vec<tokio::sync::mpsc::Sender<VideoFrame>>,
53    rt: tokio::runtime::Handle,
54) -> Result<u64> {
55    let outcome = decode_loop(&cfg, input_data, &senders, &rt);
56    // Drop senders so receivers wake and exit.
57    drop(senders);
58    outcome
59}
60
61fn decode_loop(
62    cfg: &DecodePumpConfig,
63    input_data: Bytes,
64    senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
65    rt: &tokio::runtime::Handle,
66) -> Result<u64> {
67    let mut demuxer =
68        streaming::demux_streaming(&input_data).context("demuxing input for shared decode pump")?;
69    let mut decoder =
70        decode::create_decoder_on(&cfg.codec_name, cfg.info_for_decoder.clone(), cfg.gpu_index)
71            .context("creating decoder for shared decode pump")?;
72
73    let mut frames_pushed: u64 = 0;
74    'outer: loop {
75        match demuxer
76            .next_video_sample()
77            .context("demuxing next video sample in shared decode pump")?
78        {
79            Some(sample) => {
80                decoder
81                    .push_sample(&sample.data)
82                    .context("pushing sample to shared decode pump decoder")?;
83                while let Some(frame) = decoder
84                    .decode_next()
85                    .context("decoding frame in shared decode pump")?
86                {
87                    let normalized = normalize_frame(cfg, frame)?;
88                    if !fan_out(senders, normalized, rt)? {
89                        break 'outer;
90                    }
91                    frames_pushed += 1;
92                }
93            }
94            None => {
95                decoder
96                    .finish()
97                    .context("decoder finish in shared decode pump")?;
98                while let Some(frame) = decoder
99                    .decode_next()
100                    .context("decoding frame after finish in shared decode pump")?
101                {
102                    let normalized = normalize_frame(cfg, frame)?;
103                    if !fan_out(senders, normalized, rt)? {
104                        break;
105                    }
106                    frames_pushed += 1;
107                }
108                break;
109            }
110        }
111    }
112
113    Ok(frames_pushed)
114}
115
116/// Rung-agnostic per-frame work: 4:4:4 → 4:2:0 downsample (if needed) then,
117/// when the spec's color policy asks for it (`tonemap_to_sdr`), an HDR-aware
118/// colorspace convert (tonemap PQ/HLG → SDR BT.709, identity for SDR). When the
119/// policy is passthrough/HDR, the downsampled source is forwarded unchanged.
120/// Per-rung scaling is NOT done here.
121fn normalize_frame(cfg: &DecodePumpConfig, frame: VideoFrame) -> Result<VideoFrame> {
122    let downsampled = if cfg.needs_downsample {
123        colorspace::downsample_444_to_420_frame(&frame)
124            .context("shared decode pump 4:4:4 → 4:2:0 downsample")?
125    } else {
126        frame
127    };
128    if !cfg.tonemap_to_sdr {
129        // Passthrough / HDR output: preserve the source color + bit depth.
130        return Ok(downsampled);
131    }
132    let normalized = colorspace::convert_to_sdr_bt709(&downsampled, &cfg.source_color_metadata)
133        .context("shared decode pump colorspace convert (HDR-aware)")?;
134    Ok(normalized)
135}
136
137/// Fan one frame out to every sender. Cloning `VideoFrame` is cheap (inner
138/// `Bytes` is `Arc`-backed). Returns `false` only if EVERY sender is closed.
139fn fan_out(
140    senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
141    frame: VideoFrame,
142    rt: &tokio::runtime::Handle,
143) -> Result<bool> {
144    let mut any_alive = false;
145    for (idx, sender) in senders.iter().enumerate() {
146        let frame_clone = frame.clone();
147        let sender = sender.clone();
148        let accepted = rt.block_on(async move { sender.send(frame_clone).await });
149        match accepted {
150            Ok(()) => any_alive = true,
151            Err(_) => {
152                tracing::warn!(rung_idx = idx, "shared decode pump: rung dropped its receiver");
153            }
154        }
155    }
156    Ok(any_alive)
157}