rivet/decode_pump.rs
1//! Shared source decode pump.
2//!
3//! One pump per job (not per rung): demux + decode the source **once**, run
4//! the rung-agnostic per-frame work (4:4:4 → 4:2:0 downsample + HDR tonemap),
5//! and fan the normalized frame out to N per-rung mpsc channels via cheap
6//! `VideoFrame::clone()` (the inner `Bytes` is `Arc`-backed).
7//!
8//! Per-rung scaling + encoding consume from those channels. Eliminating the
9//! redundant per-rung decode is the whole point — a 5-rung ladder decodes the
10//! source once, not five times. The cost: the slowest rung backpressures the
11//! pump (usually the largest rung, whose encoder is slowest).
12
13use anyhow::{Context, Result};
14use bytes::Bytes;
15
16use codec::frame::{ColorMetadata, PixelFormat, VideoFrame};
17use codec::{colorspace, decode};
18use container::streaming;
19
20/// Configuration for one decode pump.
21#[derive(Clone)]
22pub struct DecodePumpConfig {
23 /// Source video codec label (e.g. `"h264"`).
24 pub codec_name: String,
25 /// Stream info handed to the decoder.
26 pub info_for_decoder: codec::frame::StreamInfo,
27 /// Source color metadata (drives HDR-aware tonemap vs SDR passthrough).
28 pub source_color_metadata: ColorMetadata,
29 /// Source pixel format.
30 pub source_pixel_format: PixelFormat,
31 /// Whether to run the 4:4:4 → 4:2:0 downsample per frame.
32 pub needs_downsample: bool,
33 /// Tonemap policy (from the [`OutputSpec`](crate::spec::OutputSpec)): when
34 /// `true`, HDR (PQ/HLG) sources are mapped down to 8-bit SDR BT.709; when
35 /// `false`, the source color/transfer/bit-depth passes through unchanged.
36 /// The pump does not decide this on its own — the caller sets it from the
37 /// spec's [`ColorPolicy`](crate::spec::ColorPolicy).
38 pub tonemap_to_sdr: bool,
39 /// Pin the decoder to this physical GPU; `None` = first matching adapter.
40 pub gpu_index: Option<u32>,
41 /// Prepared per-frame video filter chain (crop/pad/flip/rotate/grayscale/
42 /// overlay/colour), applied after colorspace normalize and before the frame
43 /// is fanned out to the per-rung scalers. Overlay images are loaded once at
44 /// prepare time. `Arc` so the per-GPU pump configs clone it cheaply.
45 pub filters: std::sync::Arc<codec::filter::FilterChain>,
46}
47
48/// Blocking decode loop, designed for `tokio::task::spawn_blocking`. Fans
49/// every normalized frame out to all `senders`. If a sender's channel is
50/// closed (its rung gave up) the pump continues with the rest; it stops only
51/// when *every* sender is closed. `rt` bridges into the async `send().await`.
52///
53/// Returns the number of frames pushed.
54pub fn run_shared_decode_pump_blocking(
55 cfg: DecodePumpConfig,
56 input_data: Bytes,
57 senders: Vec<tokio::sync::mpsc::Sender<VideoFrame>>,
58 rt: tokio::runtime::Handle,
59) -> Result<u64> {
60 let outcome = decode_loop(&cfg, input_data, &senders, &rt);
61 // Drop senders so receivers wake and exit.
62 drop(senders);
63 outcome
64}
65
66fn decode_loop(
67 cfg: &DecodePumpConfig,
68 input_data: Bytes,
69 senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
70 rt: &tokio::runtime::Handle,
71) -> Result<u64> {
72 let mut demuxer =
73 streaming::demux_streaming(&input_data).context("demuxing input for shared decode pump")?;
74 let mut decoder =
75 decode::create_decoder_on(&cfg.codec_name, cfg.info_for_decoder.clone(), cfg.gpu_index)
76 .context("creating decoder for shared decode pump")?;
77
78 let mut frames_pushed: u64 = 0;
79 'outer: loop {
80 match demuxer
81 .next_video_sample()
82 .context("demuxing next video sample in shared decode pump")?
83 {
84 Some(sample) => {
85 decoder
86 .push_sample(&sample.data)
87 .context("pushing sample to shared decode pump decoder")?;
88 while let Some(frame) = decoder
89 .decode_next()
90 .context("decoding frame in shared decode pump")?
91 {
92 let normalized = normalize_frame(cfg, frame)?;
93 if !fan_out(senders, normalized, rt)? {
94 break 'outer;
95 }
96 frames_pushed += 1;
97 }
98 }
99 None => {
100 decoder
101 .finish()
102 .context("decoder finish in shared decode pump")?;
103 while let Some(frame) = decoder
104 .decode_next()
105 .context("decoding frame after finish in shared decode pump")?
106 {
107 let normalized = normalize_frame(cfg, frame)?;
108 if !fan_out(senders, normalized, rt)? {
109 break;
110 }
111 frames_pushed += 1;
112 }
113 break;
114 }
115 }
116 }
117
118 Ok(frames_pushed)
119}
120
121/// Rung-agnostic per-frame work: 4:4:4 → 4:2:0 downsample (if needed) then,
122/// when the spec's color policy asks for it (`tonemap_to_sdr`), an HDR-aware
123/// colorspace convert (tonemap PQ/HLG → SDR BT.709, identity for SDR). When the
124/// policy is passthrough/HDR, the downsampled source is forwarded unchanged.
125/// Per-rung scaling is NOT done here.
126fn normalize_frame(cfg: &DecodePumpConfig, frame: VideoFrame) -> Result<VideoFrame> {
127 let downsampled = if cfg.needs_downsample {
128 colorspace::downsample_444_to_420_frame(&frame)
129 .context("shared decode pump 4:4:4 → 4:2:0 downsample")?
130 } else {
131 frame
132 };
133 let normalized = if !cfg.tonemap_to_sdr {
134 // Passthrough / HDR output: preserve the source color + bit depth.
135 downsampled
136 } else {
137 colorspace::convert_to_sdr_bt709(&downsampled, &cfg.source_color_metadata)
138 .context("shared decode pump colorspace convert (HDR-aware)")?
139 };
140 // Video filters (crop/pad/flip/rotate/grayscale/overlay/colour) run on the
141 // normalized 4:2:0 frame, before the per-rung scalers see it.
142 if cfg.filters.is_empty() {
143 Ok(normalized)
144 } else {
145 cfg.filters.apply(normalized).context("shared decode pump video filters")
146 }
147}
148
149/// Fan one frame out to every sender. Cloning `VideoFrame` is cheap (inner
150/// `Bytes` is `Arc`-backed). Returns `false` only if EVERY sender is closed.
151fn fan_out(
152 senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
153 frame: VideoFrame,
154 rt: &tokio::runtime::Handle,
155) -> Result<bool> {
156 let mut any_alive = false;
157 for (idx, sender) in senders.iter().enumerate() {
158 let frame_clone = frame.clone();
159 let sender = sender.clone();
160 let accepted = rt.block_on(async move { sender.send(frame_clone).await });
161 match accepted {
162 Ok(()) => any_alive = true,
163 Err(_) => {
164 tracing::warn!(rung_idx = idx, "shared decode pump: rung dropped its receiver");
165 }
166 }
167 }
168 Ok(any_alive)
169}