1use anyhow::{Context, Result};
14use bytes::Bytes;
15
16use codec::frame::{ColorMetadata, PixelFormat, VideoFrame};
17use codec::{colorspace, decode};
18use container::streaming;
19
20#[derive(Clone)]
22pub struct DecodePumpConfig {
23 pub codec_name: String,
25 pub info_for_decoder: codec::frame::StreamInfo,
27 pub source_color_metadata: ColorMetadata,
29 pub source_pixel_format: PixelFormat,
31 pub needs_downsample: bool,
33 pub tonemap_to_sdr: bool,
39 pub gpu_index: Option<u32>,
41}
42
43pub fn run_shared_decode_pump_blocking(
50 cfg: DecodePumpConfig,
51 input_data: Bytes,
52 senders: Vec<tokio::sync::mpsc::Sender<VideoFrame>>,
53 rt: tokio::runtime::Handle,
54) -> Result<u64> {
55 let outcome = decode_loop(&cfg, input_data, &senders, &rt);
56 drop(senders);
58 outcome
59}
60
61fn decode_loop(
62 cfg: &DecodePumpConfig,
63 input_data: Bytes,
64 senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
65 rt: &tokio::runtime::Handle,
66) -> Result<u64> {
67 let mut demuxer =
68 streaming::demux_streaming(&input_data).context("demuxing input for shared decode pump")?;
69 let mut decoder =
70 decode::create_decoder_on(&cfg.codec_name, cfg.info_for_decoder.clone(), cfg.gpu_index)
71 .context("creating decoder for shared decode pump")?;
72
73 let mut frames_pushed: u64 = 0;
74 'outer: loop {
75 match demuxer
76 .next_video_sample()
77 .context("demuxing next video sample in shared decode pump")?
78 {
79 Some(sample) => {
80 decoder
81 .push_sample(&sample.data)
82 .context("pushing sample to shared decode pump decoder")?;
83 while let Some(frame) = decoder
84 .decode_next()
85 .context("decoding frame in shared decode pump")?
86 {
87 let normalized = normalize_frame(cfg, frame)?;
88 if !fan_out(senders, normalized, rt)? {
89 break 'outer;
90 }
91 frames_pushed += 1;
92 }
93 }
94 None => {
95 decoder
96 .finish()
97 .context("decoder finish in shared decode pump")?;
98 while let Some(frame) = decoder
99 .decode_next()
100 .context("decoding frame after finish in shared decode pump")?
101 {
102 let normalized = normalize_frame(cfg, frame)?;
103 if !fan_out(senders, normalized, rt)? {
104 break;
105 }
106 frames_pushed += 1;
107 }
108 break;
109 }
110 }
111 }
112
113 Ok(frames_pushed)
114}
115
116fn normalize_frame(cfg: &DecodePumpConfig, frame: VideoFrame) -> Result<VideoFrame> {
122 let downsampled = if cfg.needs_downsample {
123 colorspace::downsample_444_to_420_frame(&frame)
124 .context("shared decode pump 4:4:4 → 4:2:0 downsample")?
125 } else {
126 frame
127 };
128 if !cfg.tonemap_to_sdr {
129 return Ok(downsampled);
131 }
132 let normalized = colorspace::convert_to_sdr_bt709(&downsampled, &cfg.source_color_metadata)
133 .context("shared decode pump colorspace convert (HDR-aware)")?;
134 Ok(normalized)
135}
136
137fn fan_out(
140 senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
141 frame: VideoFrame,
142 rt: &tokio::runtime::Handle,
143) -> Result<bool> {
144 let mut any_alive = false;
145 for (idx, sender) in senders.iter().enumerate() {
146 let frame_clone = frame.clone();
147 let sender = sender.clone();
148 let accepted = rt.block_on(async move { sender.send(frame_clone).await });
149 match accepted {
150 Ok(()) => any_alive = true,
151 Err(_) => {
152 tracing::warn!(rung_idx = idx, "shared decode pump: rung dropped its receiver");
153 }
154 }
155 }
156 Ok(any_alive)
157}