use anyhow::{Context, Result};
use bytes::Bytes;
use codec::frame::{ColorMetadata, PixelFormat, VideoFrame};
use codec::{colorspace, decode};
use container::streaming;
#[derive(Clone)]
pub struct DecodePumpConfig {
pub codec_name: String,
pub info_for_decoder: codec::frame::StreamInfo,
pub source_color_metadata: ColorMetadata,
pub source_pixel_format: PixelFormat,
pub needs_downsample: bool,
pub tonemap_to_sdr: bool,
pub gpu_index: Option<u32>,
pub filters: std::sync::Arc<codec::filter::FilterChain>,
}
pub fn run_shared_decode_pump_blocking(
cfg: DecodePumpConfig,
input_data: Bytes,
senders: Vec<tokio::sync::mpsc::Sender<VideoFrame>>,
rt: tokio::runtime::Handle,
) -> Result<u64> {
let outcome = decode_loop(&cfg, input_data, &senders, &rt);
drop(senders);
outcome
}
fn decode_loop(
cfg: &DecodePumpConfig,
input_data: Bytes,
senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
rt: &tokio::runtime::Handle,
) -> Result<u64> {
let mut demuxer =
streaming::demux_streaming(&input_data).context("demuxing input for shared decode pump")?;
let mut decoder =
decode::create_decoder_on(&cfg.codec_name, cfg.info_for_decoder.clone(), cfg.gpu_index)
.context("creating decoder for shared decode pump")?;
let mut frames_pushed: u64 = 0;
'outer: loop {
match demuxer
.next_video_sample()
.context("demuxing next video sample in shared decode pump")?
{
Some(sample) => {
decoder
.push_sample(&sample.data)
.context("pushing sample to shared decode pump decoder")?;
while let Some(frame) = decoder
.decode_next()
.context("decoding frame in shared decode pump")?
{
let normalized = normalize_frame(cfg, frame)?;
if !fan_out(senders, normalized, rt)? {
break 'outer;
}
frames_pushed += 1;
}
}
None => {
decoder
.finish()
.context("decoder finish in shared decode pump")?;
while let Some(frame) = decoder
.decode_next()
.context("decoding frame after finish in shared decode pump")?
{
let normalized = normalize_frame(cfg, frame)?;
if !fan_out(senders, normalized, rt)? {
break;
}
frames_pushed += 1;
}
break;
}
}
}
Ok(frames_pushed)
}
fn normalize_frame(cfg: &DecodePumpConfig, frame: VideoFrame) -> Result<VideoFrame> {
let downsampled = if cfg.needs_downsample {
colorspace::downsample_444_to_420_frame(&frame)
.context("shared decode pump 4:4:4 → 4:2:0 downsample")?
} else {
frame
};
let normalized = if !cfg.tonemap_to_sdr {
downsampled
} else {
colorspace::convert_to_sdr_bt709(&downsampled, &cfg.source_color_metadata)
.context("shared decode pump colorspace convert (HDR-aware)")?
};
if cfg.filters.is_empty() {
Ok(normalized)
} else {
cfg.filters.apply(normalized).context("shared decode pump video filters")
}
}
fn fan_out(
senders: &[tokio::sync::mpsc::Sender<VideoFrame>],
frame: VideoFrame,
rt: &tokio::runtime::Handle,
) -> Result<bool> {
let mut any_alive = false;
for (idx, sender) in senders.iter().enumerate() {
let frame_clone = frame.clone();
let sender = sender.clone();
let accepted = rt.block_on(async move { sender.send(frame_clone).await });
match accepted {
Ok(()) => any_alive = true,
Err(_) => {
tracing::warn!(rung_idx = idx, "shared decode pump: rung dropped its receiver");
}
}
}
Ok(any_alive)
}