Skip to main content

rivet/
cmaf_util.rs

1//! Shared CMAF/HLS helpers used by the job engine and the multi-GPU
2//! orchestrator: segment-boundary flushing, per-rung contribution merging,
3//! bandwidth measurement, and AV1 codec-string extraction.
4
5use std::path::Path;
6
7use anyhow::{Context, Result, anyhow, bail};
8
9use codec::codec_strings::av1_codec_string;
10use codec::encode::EncodedPacket;
11use codec::pixel_format::parse_av1_sequence_header;
12use container::cmaf::{CmafAudioMuxer, CmafTrackManifest, CmafVideoMuxer, SegmentInfo};
13
14/// Keyframe interval (frames) for a target segment length at a frame rate.
15pub fn keyframe_interval_for_segment(segment_duration_seconds: f64, frame_rate: f64) -> u32 {
16    ((segment_duration_seconds * frame_rate).round() as u32).max(1)
17}
18
19/// Number of CMAF segments a rung will produce (ceil division).
20pub fn total_segments_for_rung(total_input_frames: u64, keyframe_interval: u32) -> u32 {
21    if total_input_frames == 0 || keyframe_interval == 0 {
22        return 0;
23    }
24    let ki = keyframe_interval as u64;
25    let segs = total_input_frames.div_ceil(ki);
26    segs.min(u32::MAX as u64) as u32
27}
28
29/// Add one encoded video packet to a [`CmafVideoMuxer`], flushing the prior
30/// segment first when the next packet is a keyframe and the buffered duration
31/// has reached the segment target (so each segment opens on an IDR).
32pub fn add_packet_with_segment_flush(
33    muxer: &mut CmafVideoMuxer,
34    packet: &EncodedPacket,
35    duration_ticks: u32,
36    segment_target_ticks: u64,
37) -> Result<()> {
38    if packet.is_keyframe
39        && muxer.pending_duration_ticks() >= segment_target_ticks
40        && muxer.first_pending_is_keyframe()
41    {
42        muxer.flush_segment().context("flush CMAF video segment")?;
43    }
44    muxer.add_packet(packet.data.to_vec(), duration_ticks, packet.is_keyframe)?;
45    Ok(())
46}
47
48/// Add one audio sample to a [`CmafAudioMuxer`] with segment flushing on the
49/// same time grid.
50pub fn add_audio_sample_with_segment_flush(
51    muxer: &mut CmafAudioMuxer,
52    payload: Vec<u8>,
53    duration_ticks: u32,
54    segment_target_ticks: u64,
55) -> Result<()> {
56    if muxer.pending_duration_ticks() >= segment_target_ticks {
57        muxer.flush_segment().context("flush CMAF audio segment")?;
58    }
59    muxer.add_packet(payload, duration_ticks)?;
60    Ok(())
61}
62
63/// One encoder worker's contribution to a rung (a slice of its segments).
64#[derive(Debug, Clone)]
65pub struct RungContribution {
66    pub width: u32,
67    pub height: u32,
68    pub relative_dir: String,
69    pub manifest: CmafTrackManifest,
70}
71
72/// Merge several workers' segment lists for one rung into a single ordered
73/// manifest, detecting duplicate segment numbers and internal gaps.
74pub fn merge_rung_contributions(contributions: Vec<RungContribution>) -> Result<RungContribution> {
75    if contributions.is_empty() {
76        bail!("merge_rung_contributions: at least one contribution required");
77    }
78    let first = &contributions[0];
79    let width = first.width;
80    let height = first.height;
81    let relative_dir = first.relative_dir.clone();
82    let timescale = first.manifest.timescale;
83    let init_path = first.manifest.init_path.clone();
84
85    for c in &contributions[1..] {
86        if c.width != width || c.height != height {
87            bail!(
88                "contributors disagree on dimensions: first={width}x{height}, other={}x{}",
89                c.width,
90                c.height
91            );
92        }
93        if c.relative_dir != relative_dir {
94            bail!("contributors disagree on relative_dir");
95        }
96        if c.manifest.timescale != timescale {
97            bail!("contributors disagree on timescale");
98        }
99    }
100
101    let mut all_segments: Vec<SegmentInfo> = contributions
102        .into_iter()
103        .flat_map(|c| c.manifest.segments)
104        .collect();
105    all_segments.sort_by_key(|s| s.sequence_number);
106
107    for w in all_segments.windows(2) {
108        if w[0].sequence_number == w[1].sequence_number {
109            bail!(
110                "duplicate segment number {} in merged manifest (paths: {:?}, {:?})",
111                w[0].sequence_number,
112                w[0].path,
113                w[1].path
114            );
115        }
116    }
117    if let (Some(first), Some(last)) = (all_segments.first(), all_segments.last()) {
118        let expected = last.sequence_number - first.sequence_number + 1;
119        if all_segments.len() as u32 != expected {
120            bail!(
121                "internal gap in merged segments: range {}..={} expects {} segments, got {}",
122                first.sequence_number,
123                last.sequence_number,
124                expected,
125                all_segments.len()
126            );
127        }
128    }
129
130    Ok(RungContribution {
131        width,
132        height,
133        relative_dir,
134        manifest: CmafTrackManifest {
135            init_path,
136            segments: all_segments,
137            timescale,
138        },
139    })
140}
141
142/// (average, peak) bandwidth in bits/sec across a manifest's segments.
143pub fn measure_bandwidth(manifest: &CmafTrackManifest) -> (u32, u32) {
144    if manifest.segments.is_empty() {
145        return (0, 0);
146    }
147    let total_bytes: u64 = manifest.segments.iter().map(|s| s.byte_size).sum();
148    let total_ticks: u64 = manifest.segments.iter().map(|s| s.duration_ticks).sum();
149    let total_seconds = total_ticks as f64 / manifest.timescale.max(1) as f64;
150    let avg_bps = if total_seconds > 0.0 {
151        ((total_bytes as f64 * 8.0) / total_seconds) as u32
152    } else {
153        0
154    };
155    let mut peak_bps: u32 = 0;
156    for seg in &manifest.segments {
157        let secs = seg.duration_ticks as f64 / manifest.timescale.max(1) as f64;
158        if secs > 0.0 {
159            let bps = ((seg.byte_size as f64 * 8.0) / secs) as u32;
160            peak_bps = peak_bps.max(bps);
161        }
162    }
163    (avg_bps, peak_bps.max(avg_bps))
164}
165
166/// Parse the AV1 codec string (`av01.…`) from a rendition's init segment.
167pub fn av1_codec_string_from_init(init_path: &Path) -> Result<String> {
168    let bytes = std::fs::read(init_path)
169        .with_context(|| format!("reading init segment {}", init_path.display()))?;
170    let obus = find_av1c_config_obus(&bytes)
171        .ok_or_else(|| anyhow!("av1C box not found in init segment"))?;
172    let seq = parse_av1_sequence_header(obus)
173        .ok_or_else(|| anyhow!("could not parse AV1 sequence header from av1C"))?;
174    Ok(av1_codec_string(&seq))
175}
176
177fn find_av1c_config_obus(buf: &[u8]) -> Option<&[u8]> {
178    let moov = find_box(buf, b"moov")?;
179    let trak = find_child_box(moov, b"trak")?;
180    let mdia = find_child_box(trak, b"mdia")?;
181    let minf = find_child_box(mdia, b"minf")?;
182    let stbl = find_child_box(minf, b"stbl")?;
183    let stsd = find_child_box(stbl, b"stsd")?;
184    if stsd.len() < 16 {
185        return None;
186    }
187    let after_header_and_count = &stsd[8 + 8..];
188    let av01 = find_box(after_header_and_count, b"av01")?;
189    if av01.len() < 8 + 78 {
190        return None;
191    }
192    let av01_children = &av01[8 + 78..];
193    let av1c = find_box(av01_children, b"av1C")?;
194    if av1c.len() < 8 + 4 {
195        return None;
196    }
197    Some(&av1c[8 + 4..])
198}
199
200fn find_child_box<'a>(parent: &'a [u8], box_type: &[u8; 4]) -> Option<&'a [u8]> {
201    if parent.len() < 8 {
202        return None;
203    }
204    find_box(&parent[8..], box_type)
205}
206
207fn find_box<'a>(buf: &'a [u8], box_type: &[u8; 4]) -> Option<&'a [u8]> {
208    let mut pos = 0;
209    while pos + 8 <= buf.len() {
210        let size = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
211        if size < 8 || pos + size > buf.len() {
212            return None;
213        }
214        let kind = &buf[pos + 4..pos + 8];
215        if kind == box_type {
216            return Some(&buf[pos..pos + size]);
217        }
218        pos += size;
219    }
220    None
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn total_segments_ceil() {
229        assert_eq!(total_segments_for_rung(100, 48), 3);
230        assert_eq!(total_segments_for_rung(96, 48), 2);
231        assert_eq!(total_segments_for_rung(0, 48), 0);
232        assert_eq!(total_segments_for_rung(100, 0), 0);
233    }
234
235    fn contribution(start: u32, end: u32) -> RungContribution {
236        let segments = (start..=end)
237            .map(|s| SegmentInfo {
238                sequence_number: s,
239                path: format!("/tmp/seg-{s:05}.m4s").into(),
240                byte_size: 1024,
241                duration_ticks: 3000,
242            })
243            .collect();
244        RungContribution {
245            width: 1280,
246            height: 720,
247            relative_dir: "video/720p".into(),
248            manifest: CmafTrackManifest {
249                init_path: "/tmp/init.mp4".into(),
250                segments,
251                timescale: 30000,
252            },
253        }
254    }
255
256    #[test]
257    fn merge_orders_and_dedups() {
258        let merged = merge_rung_contributions(vec![contribution(3, 5), contribution(1, 2)]).unwrap();
259        let seqs: Vec<u32> = merged.manifest.segments.iter().map(|s| s.sequence_number).collect();
260        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
261    }
262
263    #[test]
264    fn merge_detects_duplicate() {
265        assert!(merge_rung_contributions(vec![contribution(1, 3), contribution(3, 4)]).is_err());
266    }
267
268    #[test]
269    fn bandwidth_nonzero() {
270        let c = contribution(1, 4);
271        let (avg, peak) = measure_bandwidth(&c.manifest);
272        assert!(avg > 0);
273        assert!(peak >= avg);
274    }
275}