1use std::path::Path;
6
7use anyhow::{Context, Result, anyhow, bail};
8
9use codec::codec_strings::{av1_codec_string, avc_codec_string, hevc_codec_string};
10use codec::encode::EncodedPacket;
11use codec::pixel_format::{H264SpsInfo, HevcSpsInfo, parse_av1_sequence_header};
12use container::cmaf::{CmafAudioMuxer, CmafTrackManifest, CmafVideoMuxer, SegmentInfo};
13
14pub fn keyframe_interval_for_segment(segment_duration_seconds: f64, frame_rate: f64) -> u32 {
16 ((segment_duration_seconds * frame_rate).round() as u32).max(1)
17}
18
19pub fn total_segments_for_rung(total_input_frames: u64, keyframe_interval: u32) -> u32 {
21 if total_input_frames == 0 || keyframe_interval == 0 {
22 return 0;
23 }
24 let ki = keyframe_interval as u64;
25 let segs = total_input_frames.div_ceil(ki);
26 segs.min(u32::MAX as u64) as u32
27}
28
29pub fn add_packet_with_segment_flush(
33 muxer: &mut CmafVideoMuxer,
34 packet: &EncodedPacket,
35 duration_ticks: u32,
36 segment_target_ticks: u64,
37) -> Result<()> {
38 if packet.is_keyframe
39 && muxer.pending_duration_ticks() >= segment_target_ticks
40 && muxer.first_pending_is_keyframe()
41 {
42 muxer.flush_segment().context("flush CMAF video segment")?;
43 }
44 muxer.add_packet(packet.data.to_vec(), duration_ticks, packet.is_keyframe)?;
45 Ok(())
46}
47
48pub fn add_audio_sample_with_segment_flush(
51 muxer: &mut CmafAudioMuxer,
52 payload: Vec<u8>,
53 duration_ticks: u32,
54 segment_target_ticks: u64,
55) -> Result<()> {
56 if muxer.pending_duration_ticks() >= segment_target_ticks {
57 muxer.flush_segment().context("flush CMAF audio segment")?;
58 }
59 muxer.add_packet(payload, duration_ticks)?;
60 Ok(())
61}
62
63#[derive(Debug, Clone)]
65pub struct RungContribution {
66 pub width: u32,
67 pub height: u32,
68 pub relative_dir: String,
69 pub manifest: CmafTrackManifest,
70}
71
72pub fn merge_rung_contributions(contributions: Vec<RungContribution>) -> Result<RungContribution> {
75 if contributions.is_empty() {
76 bail!("merge_rung_contributions: at least one contribution required");
77 }
78 let first = &contributions[0];
79 let width = first.width;
80 let height = first.height;
81 let relative_dir = first.relative_dir.clone();
82 let timescale = first.manifest.timescale;
83 let init_path = first.manifest.init_path.clone();
84
85 for c in &contributions[1..] {
86 if c.width != width || c.height != height {
87 bail!(
88 "contributors disagree on dimensions: first={width}x{height}, other={}x{}",
89 c.width,
90 c.height
91 );
92 }
93 if c.relative_dir != relative_dir {
94 bail!("contributors disagree on relative_dir");
95 }
96 if c.manifest.timescale != timescale {
97 bail!("contributors disagree on timescale");
98 }
99 }
100
101 let mut all_segments: Vec<SegmentInfo> = contributions
102 .into_iter()
103 .flat_map(|c| c.manifest.segments)
104 .collect();
105 all_segments.sort_by_key(|s| s.sequence_number);
106
107 for w in all_segments.windows(2) {
108 if w[0].sequence_number == w[1].sequence_number {
109 bail!(
110 "duplicate segment number {} in merged manifest (paths: {:?}, {:?})",
111 w[0].sequence_number,
112 w[0].path,
113 w[1].path
114 );
115 }
116 }
117 if let (Some(first), Some(last)) = (all_segments.first(), all_segments.last()) {
118 let expected = last.sequence_number - first.sequence_number + 1;
119 if all_segments.len() as u32 != expected {
120 bail!(
121 "internal gap in merged segments: range {}..={} expects {} segments, got {}",
122 first.sequence_number,
123 last.sequence_number,
124 expected,
125 all_segments.len()
126 );
127 }
128 }
129
130 Ok(RungContribution {
131 width,
132 height,
133 relative_dir,
134 manifest: CmafTrackManifest {
135 init_path,
136 segments: all_segments,
137 timescale,
138 },
139 })
140}
141
142pub fn measure_bandwidth(manifest: &CmafTrackManifest) -> (u32, u32) {
144 if manifest.segments.is_empty() {
145 return (0, 0);
146 }
147 let total_bytes: u64 = manifest.segments.iter().map(|s| s.byte_size).sum();
148 let total_ticks: u64 = manifest.segments.iter().map(|s| s.duration_ticks).sum();
149 let total_seconds = total_ticks as f64 / manifest.timescale.max(1) as f64;
150 let avg_bps = if total_seconds > 0.0 {
151 ((total_bytes as f64 * 8.0) / total_seconds) as u32
152 } else {
153 0
154 };
155 let mut peak_bps: u32 = 0;
156 for seg in &manifest.segments {
157 let secs = seg.duration_ticks as f64 / manifest.timescale.max(1) as f64;
158 if secs > 0.0 {
159 let bps = ((seg.byte_size as f64 * 8.0) / secs) as u32;
160 peak_bps = peak_bps.max(bps);
161 }
162 }
163 (avg_bps, peak_bps.max(avg_bps))
164}
165
166pub fn codec_string_from_init(init_path: &Path) -> Result<String> {
170 let bytes = std::fs::read(init_path)
171 .with_context(|| format!("reading init segment {}", init_path.display()))?;
172 let entries =
173 stsd_sample_entries(&bytes).ok_or_else(|| anyhow!("stsd entries not found in init"))?;
174 if entries.len() < 8 {
175 bail!("init segment sample entry truncated");
176 }
177 let fourcc: [u8; 4] = entries[4..8].try_into().unwrap();
178 let entry = find_box(entries, &fourcc).ok_or_else(|| anyhow!("sample entry box not found"))?;
179 let children = entry.get(8 + 78..).unwrap_or(&[]);
182 let fcc_str = std::str::from_utf8(&fourcc).unwrap_or("");
183 match &fourcc {
184 b"av01" => {
185 let av1c = find_box(children, b"av1C").ok_or_else(|| anyhow!("av1C box missing"))?;
186 let obus = av1c.get(8 + 4..).ok_or_else(|| anyhow!("av1C truncated"))?;
187 let seq = parse_av1_sequence_header(obus)
188 .ok_or_else(|| anyhow!("could not parse AV1 sequence header from av1C"))?;
189 Ok(av1_codec_string(&seq))
190 }
191 b"avc1" | b"avc3" => {
192 let avcc = find_box(children, b"avcC").ok_or_else(|| anyhow!("avcC box missing"))?;
193 let body = avcc.get(8..).ok_or_else(|| anyhow!("avcC truncated"))?;
195 if body.len() < 4 {
196 bail!("avcC profile/level truncated");
197 }
198 let sps = H264SpsInfo {
199 profile_idc: body[1],
200 constraint_set_flags: body[2],
201 level_idc: body[3],
202 ..Default::default()
203 };
204 Ok(avc_codec_string(fcc_str, &sps))
205 }
206 b"hvc1" | b"hev1" => {
207 let hvcc = find_box(children, b"hvcC").ok_or_else(|| anyhow!("hvcC box missing"))?;
208 let body = hvcc.get(8..).ok_or_else(|| anyhow!("hvcC truncated"))?;
211 if body.len() < 13 {
212 bail!("hvcC profile-tier-level truncated");
213 }
214 let b1 = body[1];
215 let constraint = ((body[6] as u64) << 40)
216 | ((body[7] as u64) << 32)
217 | ((body[8] as u64) << 24)
218 | ((body[9] as u64) << 16)
219 | ((body[10] as u64) << 8)
220 | (body[11] as u64);
221 let sps = HevcSpsInfo {
222 general_profile_space: b1 >> 6,
223 tier_flag: (b1 >> 5) & 1 == 1,
224 profile_idc: b1 & 0x1F,
225 profile_compatibility_flags: u32::from_be_bytes([body[2], body[3], body[4], body[5]]),
226 general_constraint_flags: constraint,
227 level_idc: body[12],
228 ..Default::default()
229 };
230 Ok(hevc_codec_string(fcc_str, &sps))
231 }
232 other => bail!("unsupported video sample entry fourcc {other:?} in init segment"),
233 }
234}
235
236fn stsd_sample_entries(buf: &[u8]) -> Option<&[u8]> {
237 let moov = find_box(buf, b"moov")?;
238 let trak = find_child_box(moov, b"trak")?;
239 let mdia = find_child_box(trak, b"mdia")?;
240 let minf = find_child_box(mdia, b"minf")?;
241 let stbl = find_child_box(minf, b"stbl")?;
242 let stsd = find_child_box(stbl, b"stsd")?;
243 if stsd.len() < 16 {
244 return None;
245 }
246 Some(&stsd[16..])
248}
249
250fn find_child_box<'a>(parent: &'a [u8], box_type: &[u8; 4]) -> Option<&'a [u8]> {
251 if parent.len() < 8 {
252 return None;
253 }
254 find_box(&parent[8..], box_type)
255}
256
257fn find_box<'a>(buf: &'a [u8], box_type: &[u8; 4]) -> Option<&'a [u8]> {
258 let mut pos = 0;
259 while pos + 8 <= buf.len() {
260 let size = u32::from_be_bytes(buf[pos..pos + 4].try_into().ok()?) as usize;
261 if size < 8 || pos + size > buf.len() {
262 return None;
263 }
264 let kind = &buf[pos + 4..pos + 8];
265 if kind == box_type {
266 return Some(&buf[pos..pos + size]);
267 }
268 pos += size;
269 }
270 None
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn total_segments_ceil() {
279 assert_eq!(total_segments_for_rung(100, 48), 3);
280 assert_eq!(total_segments_for_rung(96, 48), 2);
281 assert_eq!(total_segments_for_rung(0, 48), 0);
282 assert_eq!(total_segments_for_rung(100, 0), 0);
283 }
284
285 fn contribution(start: u32, end: u32) -> RungContribution {
286 let segments = (start..=end)
287 .map(|s| SegmentInfo {
288 sequence_number: s,
289 path: format!("/tmp/seg-{s:05}.m4s").into(),
290 byte_size: 1024,
291 duration_ticks: 3000,
292 })
293 .collect();
294 RungContribution {
295 width: 1280,
296 height: 720,
297 relative_dir: "video/720p".into(),
298 manifest: CmafTrackManifest {
299 init_path: "/tmp/init.mp4".into(),
300 segments,
301 timescale: 30000,
302 },
303 }
304 }
305
306 #[test]
307 fn merge_orders_and_dedups() {
308 let merged = merge_rung_contributions(vec![contribution(3, 5), contribution(1, 2)]).unwrap();
309 let seqs: Vec<u32> = merged.manifest.segments.iter().map(|s| s.sequence_number).collect();
310 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
311 }
312
313 #[test]
314 fn merge_detects_duplicate() {
315 assert!(merge_rung_contributions(vec![contribution(1, 3), contribution(3, 4)]).is_err());
316 }
317
318 #[test]
319 fn bandwidth_nonzero() {
320 let c = contribution(1, 4);
321 let (avg, peak) = measure_bandwidth(&c.manifest);
322 assert!(avg > 0);
323 assert!(peak >= avg);
324 }
325}