1use std::collections::VecDeque;
16use std::fmt::Write as _;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use super::engine::{SegmentEngine, BANDWIDTH_HINT};
20use super::{Muxer, Packager};
21use crate::traits::StorageBackend;
22use crate::{MediaFrame, Result};
23use async_trait::async_trait;
24use bytes::Bytes;
25
26#[derive(Debug, Clone, Copy)]
28struct TimelineEntry {
29 start_ms: u64,
30 dur_ms: u64,
31}
32
33#[derive(Debug, Clone)]
38pub struct DashManifest {
39 window: usize,
40 timescale: u64,
41 low_latency: bool,
42 part_target: f64,
43 availability_start: u64, segments: VecDeque<(u64, TimelineEntry)>, codecs: Option<String>,
46 width: u16,
47 height: u16,
48 bandwidth: u32,
49 init_uri: String,
50 media_template: String,
51 finished: bool,
52}
53
54impl DashManifest {
55 pub fn new(window: usize) -> Self {
57 Self {
58 window: window.max(1),
59 timescale: 1000,
60 low_latency: false,
61 part_target: 0.0,
62 availability_start: SystemTime::now()
63 .duration_since(UNIX_EPOCH)
64 .map(|d| d.as_secs())
65 .unwrap_or(0),
66 segments: VecDeque::new(),
67 codecs: None,
68 width: 0,
69 height: 0,
70 bandwidth: 0,
71 init_uri: "init.m4s".into(),
72 media_template: "seg$Number$.m4s".into(),
73 finished: false,
74 }
75 }
76
77 pub fn low_latency(mut self, part_target: f64) -> Self {
81 self.low_latency = true;
82 self.part_target = part_target.max(0.05);
83 self
84 }
85
86 pub fn set_media_info(
89 &mut self,
90 codecs: Option<String>,
91 width: u16,
92 height: u16,
93 bandwidth: u32,
94 ) {
95 self.codecs = codecs;
96 self.width = width;
97 self.height = height;
98 self.bandwidth = bandwidth;
99 }
100
101 pub fn set_uris(&mut self, init_uri: impl Into<String>, media_template: impl Into<String>) {
103 self.init_uri = init_uri.into();
104 self.media_template = media_template.into();
105 }
106
107 pub fn push(&mut self, seq: u64, start_ms: u64, dur_ms: u64) {
109 self.segments
110 .push_back((seq, TimelineEntry { start_ms, dur_ms }));
111 while self.segments.len() > self.window {
112 self.segments.pop_front();
113 }
114 }
115
116 pub fn finish(&mut self) {
119 self.finished = true;
120 }
121
122 fn start_number(&self) -> u64 {
125 self.segments.front().map(|(seq, _)| *seq).unwrap_or(0)
126 }
127
128 fn max_seg_secs(&self) -> f64 {
130 self.segments
131 .iter()
132 .map(|(_, t)| t.dur_ms)
133 .max()
134 .unwrap_or(0) as f64
135 / 1000.0
136 }
137
138 pub fn render(&self) -> String {
140 let mut s = String::with_capacity(512 + self.segments.len() * 48);
141 s.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
142
143 let mpd_type = if self.finished { "static" } else { "dynamic" };
144 let profile = "urn:mpeg:dash:profile:isoff-live:2011";
145 s.push_str("<MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\" ");
146 let _ = write!(s, "type=\"{mpd_type}\" profiles=\"{profile}\" ");
149 s.push_str("minBufferTime=\"PT1.5S\" ");
150 if !self.finished {
151 let _ = write!(
152 s,
153 "availabilityStartTime=\"{}\" ",
154 unix_to_iso8601(self.availability_start)
155 );
156 s.push_str("minimumUpdatePeriod=\"PT1S\" ");
157 }
158 let _ = writeln!(s, "maxSegmentDuration=\"PT{:.3}S\">", self.max_seg_secs());
159
160 s.push_str(" <Period id=\"0\" start=\"PT0S\">\n");
161 s.push_str(" <AdaptationSet mimeType=\"video/mp4\" segmentAlignment=\"true\" startWithSAP=\"1\">\n");
162 let _ = write!(
163 s,
164 " <Representation id=\"v0\" bandwidth=\"{}\"",
165 self.bandwidth
166 );
167 if self.width > 0 && self.height > 0 {
168 let _ = write!(s, " width=\"{}\" height=\"{}\"", self.width, self.height);
169 }
170 if let Some(codecs) = &self.codecs {
171 let _ = write!(s, " codecs=\"{codecs}\"");
172 }
173 s.push_str(">\n");
174
175 let _ = write!(
176 s,
177 " <SegmentTemplate timescale=\"{}\" initialization=\"{}\" media=\"{}\" startNumber=\"{}\"",
178 self.timescale,
179 self.init_uri,
180 self.media_template,
181 self.start_number()
182 );
183 if self.low_latency {
184 let _ = write!(
185 s,
186 " availabilityTimeOffset=\"{:.3}\" availabilityTimeComplete=\"false\"",
187 (self.max_seg_secs() - self.part_target).max(0.0)
188 );
189 }
190 s.push_str(">\n");
191 s.push_str(" <SegmentTimeline>\n");
192 for (i, (_, t)) in self.segments.iter().enumerate() {
196 if i == 0 {
197 let _ = writeln!(
198 s,
199 " <S t=\"{}\" d=\"{}\"/>",
200 t.start_ms, t.dur_ms
201 );
202 } else {
203 let _ = writeln!(s, " <S d=\"{}\"/>", t.dur_ms);
204 }
205 }
206 s.push_str(" </SegmentTimeline>\n");
207 s.push_str(" </SegmentTemplate>\n");
208 s.push_str(" </Representation>\n");
209 s.push_str(" </AdaptationSet>\n");
210 s.push_str(" </Period>\n");
211 s.push_str("</MPD>\n");
212 s
213 }
214}
215
216fn unix_to_iso8601(secs: u64) -> String {
219 let days = (secs / 86_400) as i64;
220 let rem = secs % 86_400;
221 let (h, m, sec) = (rem / 3600, (rem % 3600) / 60, rem % 60);
222
223 let z = days + 719_468;
224 let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
225 let doe = z - era * 146_097;
226 let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
227 let y = yoe + era * 400;
228 let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
229 let mp = (5 * doy + 2) / 153;
230 let d = doy - (153 * mp + 2) / 5 + 1;
231 let month = if mp < 10 { mp + 3 } else { mp - 9 };
232 let year = if month <= 2 { y + 1 } else { y };
233 format!("{year:04}-{month:02}-{d:02}T{h:02}:{m:02}:{sec:02}Z")
234}
235
236pub struct DashPackager<M: Muxer, S: StorageBackend> {
240 engine: SegmentEngine<M, S>,
241 manifest: DashManifest,
242 seg_start_ms: u64,
243}
244
245impl<M: Muxer, S: StorageBackend> DashPackager<M, S> {
246 pub fn new(
249 muxer: M,
250 storage: S,
251 prefix: impl Into<String>,
252 target_duration: u64,
253 window: usize,
254 ) -> Self {
255 Self {
256 engine: SegmentEngine::new(muxer, storage, prefix, target_duration),
257 manifest: DashManifest::new(window),
258 seg_start_ms: 0,
259 }
260 }
261
262 pub fn low_latency(mut self, part_target: f64) -> Self {
264 self.manifest = self.manifest.low_latency(part_target);
265 self
266 }
267
268 pub fn manifest_key(&self) -> String {
270 format!("{}/manifest.mpd", self.engine.prefix)
271 }
272
273 async fn ensure_init(&mut self, frame: &MediaFrame) -> Result<()> {
274 if let Some(uri) = self.engine.ensure_init(frame).await? {
275 self.manifest.set_uris(
276 uri,
277 format!("seg$Number$.{}", self.engine.muxer.extension()),
278 );
279 self.manifest
281 .set_media_info(self.engine.muxer.codec_string(), 0, 0, BANDWIDTH_HINT);
282 }
283 Ok(())
284 }
285
286 async fn cut(&mut self, duration: f64) -> Result<()> {
287 let bytes = self.engine.muxer.finish_segment()?;
288 if bytes.is_empty() {
289 return Ok(());
290 }
291 let uri = self.engine.segment_uri(self.engine.seq);
292 let key = self.engine.key(&uri);
293 self.engine.storage.put(&key, bytes).await?;
294 let dur_ms = (duration * 1000.0) as u64;
295 self.manifest
296 .push(self.engine.seq, self.seg_start_ms, dur_ms);
297 self.seg_start_ms += dur_ms;
298 self.write_manifest().await?;
299 self.engine.seq += 1;
300 Ok(())
301 }
302
303 async fn write_manifest(&mut self) -> Result<()> {
304 let body = self.manifest.render();
305 self.engine
306 .storage
307 .put(
308 &self.engine.key("manifest.mpd"),
309 Bytes::from(body.into_bytes()),
310 )
311 .await
312 }
313}
314
315#[async_trait]
316impl<M: Muxer, S: StorageBackend> Packager for DashPackager<M, S> {
317 async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
318 self.ensure_init(frame).await?;
319 let decision = self.engine.observe(frame);
320 if decision.skip {
321 return Ok(());
322 }
323 if let Some(duration) = decision.cut_previous {
324 self.cut(duration).await?;
325 }
326 if decision.open_new {
327 self.engine.muxer.start_segment()?;
328 }
329 self.engine.muxer.write(frame)?;
330 Ok(())
331 }
332
333 async fn finish(&mut self) -> Result<()> {
334 if let Some(duration) = self.engine.flush() {
335 self.cut(duration).await?;
336 }
337 self.manifest.finish();
338 self.write_manifest().await
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::packager::PassthroughMuxer;
346 use crate::testing::{video_frame, InMemoryStorage};
347
348 #[test]
349 fn iso8601_known_vector() {
350 assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
352 assert_eq!(unix_to_iso8601(1_609_459_200), "2021-01-01T00:00:00Z");
353 assert_eq!(
354 unix_to_iso8601(1_609_459_200 + 3661),
355 "2021-01-01T01:01:01Z"
356 );
357 }
358
359 #[test]
360 fn manifest_renders_dynamic_mpd_with_timeline() {
361 let mut m = DashManifest::new(3);
362 m.set_media_info(Some("avc1.42001f".into()), 1280, 720, 2_000_000);
363 m.push(0, 0, 2000);
364 m.push(1, 2000, 1980);
365 let out = m.render();
366 assert!(out.contains("type=\"dynamic\""));
367 assert!(out.contains("availabilityStartTime="));
368 assert!(out.contains("<SegmentTimeline>"));
369 assert!(out.contains("<S t=\"0\" d=\"2000\"/>"));
370 assert!(out.contains("<S d=\"1980\"/>"));
371 assert!(out.contains("codecs=\"avc1.42001f\""));
372 assert!(out.contains("width=\"1280\" height=\"720\""));
373 }
374
375 #[test]
376 fn low_latency_emits_availability_time_offset() {
377 let mut m = DashManifest::new(3).low_latency(0.5);
378 m.push(0, 0, 2000);
379 let out = m.render();
380 assert!(out.contains("availabilityTimeOffset="));
381 assert!(out.contains("availabilityTimeComplete=\"false\""));
382 }
383
384 #[test]
385 fn finish_marks_static() {
386 let mut m = DashManifest::new(3);
387 m.push(0, 0, 2000);
388 m.finish();
389 let out = m.render();
390 assert!(out.contains("type=\"static\""));
391 assert!(!out.contains("minimumUpdatePeriod"));
392 }
393
394 #[tokio::test]
395 async fn packager_writes_segments_and_mpd() {
396 let store = InMemoryStorage::new();
397 let mut dash = DashPackager::new(
398 PassthroughMuxer::new("m4s"),
399 store.clone(),
400 "live/dash",
401 2,
402 5,
403 );
404
405 for i in 0..4 {
406 let pts = i * 1000;
407 dash.push(&video_frame(pts, true)).await.unwrap();
408 dash.push(&video_frame(pts + 500, false)).await.unwrap();
409 }
410 dash.finish().await.unwrap();
411
412 let mpd =
413 String::from_utf8(store.get("live/dash/manifest.mpd").await.unwrap().to_vec()).unwrap();
414 assert!(mpd.contains("<MPD"));
415 assert!(mpd.contains("type=\"static\"")); assert!(mpd.contains("<SegmentTimeline>"));
417 assert!(store.get("live/dash/seg0.m4s").await.is_ok());
418 }
419}