Skip to main content

arcly_stream/packager/
dash.rs

1//! DASH / LL-DASH manifest (`.mpd`) generation and packaging.
2//!
3//! Reuses the [`Muxer`](super::Muxer) / [`Packager`](super::Packager) contracts:
4//! the media segments are ordinary CMAF (fMP4) — produce them with
5//! [`Fmp4Muxer`](super::Fmp4Muxer) — and [`DashManifest`] renders a dynamic
6//! (live) MPD using a `SegmentTemplate` + `SegmentTimeline`, which describes
7//! variable, keyframe-aligned segment durations exactly.
8//!
9//! Low-latency DASH is enabled with [`DashManifest::low_latency`] /
10//! [`DashPackager::low_latency`]: the MPD carries `availabilityTimeOffset` so a
11//! player can fetch the in-progress segment as it is produced (chunked CMAF —
12//! the chunks are the same `moof`+`mdat` fragments [`Muxer::take_partial`]
13//! yields; the actual chunked HTTP transfer is the delivery layer's job).
14
15use std::collections::VecDeque;
16use std::fmt::Write as _;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use super::engine::{SegmentEngine, BANDWIDTH_HINT};
20use super::{Muxer, Packager};
21use crate::traits::StorageBackend;
22use crate::{MediaFrame, Result};
23use async_trait::async_trait;
24use bytes::Bytes;
25
26/// One segment's timing in the MPD `SegmentTimeline` (milliseconds).
27#[derive(Debug, Clone, Copy)]
28struct TimelineEntry {
29    start_ms: u64,
30    dur_ms: u64,
31}
32
33/// A dynamic (live) DASH media-presentation-description generator.
34///
35/// Holds a sliding window of segment timings and renders a spec-compliant MPD
36/// with a number-addressed `SegmentTemplate` and an explicit `SegmentTimeline`.
37#[derive(Debug, Clone)]
38pub struct DashManifest {
39    window: usize,
40    timescale: u64,
41    low_latency: bool,
42    part_target: f64,
43    availability_start: u64,                  // unix seconds
44    segments: VecDeque<(u64, TimelineEntry)>, // (seq, timing)
45    codecs: Option<String>,
46    width: u16,
47    height: u16,
48    bandwidth: u32,
49    init_uri: String,
50    media_template: String,
51    finished: bool,
52}
53
54impl DashManifest {
55    /// A live manifest retaining `window` segments, timed in milliseconds.
56    pub fn new(window: usize) -> Self {
57        Self {
58            window: window.max(1),
59            timescale: 1000,
60            low_latency: false,
61            part_target: 0.0,
62            availability_start: SystemTime::now()
63                .duration_since(UNIX_EPOCH)
64                .map(|d| d.as_secs())
65                .unwrap_or(0),
66            segments: VecDeque::new(),
67            codecs: None,
68            width: 0,
69            height: 0,
70            bandwidth: 0,
71            init_uri: "init.m4s".into(),
72            media_template: "seg$Number$.m4s".into(),
73            finished: false,
74        }
75    }
76
77    /// Enable LL-DASH: advertise `availabilityTimeOffset` of `part_target`
78    /// seconds (one segment minus one part), letting players read the open
79    /// segment as its chunks are produced.
80    pub fn low_latency(mut self, part_target: f64) -> Self {
81        self.low_latency = true;
82        self.part_target = part_target.max(0.05);
83        self
84    }
85
86    /// Set the codec string (`avc1.*` / `hvc1.*`), pixel dimensions, and a
87    /// nominal bandwidth (bits/sec) for the `Representation`.
88    pub fn set_media_info(
89        &mut self,
90        codecs: Option<String>,
91        width: u16,
92        height: u16,
93        bandwidth: u32,
94    ) {
95        self.codecs = codecs;
96        self.width = width;
97        self.height = height;
98        self.bandwidth = bandwidth;
99    }
100
101    /// Override the init-segment and media-segment template URIs.
102    pub fn set_uris(&mut self, init_uri: impl Into<String>, media_template: impl Into<String>) {
103        self.init_uri = init_uri.into();
104        self.media_template = media_template.into();
105    }
106
107    /// Append a segment timing, evicting the oldest beyond the window.
108    pub fn push(&mut self, seq: u64, start_ms: u64, dur_ms: u64) {
109        self.segments
110            .push_back((seq, TimelineEntry { start_ms, dur_ms }));
111        while self.segments.len() > self.window {
112            self.segments.pop_front();
113        }
114    }
115
116    /// Mark the presentation static (VOD) — sets `type="static"` and a final
117    /// `minimumUpdatePeriod`-free manifest.
118    pub fn finish(&mut self) {
119        self.finished = true;
120    }
121
122    /// The media sequence number of the first segment in the window (the MPD
123    /// `startNumber`).
124    fn start_number(&self) -> u64 {
125        self.segments.front().map(|(seq, _)| *seq).unwrap_or(0)
126    }
127
128    /// Largest segment duration in the window (seconds), for `maxSegmentDuration`.
129    fn max_seg_secs(&self) -> f64 {
130        self.segments
131            .iter()
132            .map(|(_, t)| t.dur_ms)
133            .max()
134            .unwrap_or(0) as f64
135            / 1000.0
136    }
137
138    /// Render the MPD document.
139    pub fn render(&self) -> String {
140        let mut s = String::with_capacity(512 + self.segments.len() * 48);
141        s.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
142
143        let mpd_type = if self.finished { "static" } else { "dynamic" };
144        let profile = "urn:mpeg:dash:profile:isoff-live:2011";
145        s.push_str("<MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\" ");
146        // `write!` formats straight into the buffer; `format!` would allocate a
147        // throwaway String per line, and this renders on every segment cut.
148        let _ = write!(s, "type=\"{mpd_type}\" profiles=\"{profile}\" ");
149        s.push_str("minBufferTime=\"PT1.5S\" ");
150        if !self.finished {
151            let _ = write!(
152                s,
153                "availabilityStartTime=\"{}\" ",
154                unix_to_iso8601(self.availability_start)
155            );
156            s.push_str("minimumUpdatePeriod=\"PT1S\" ");
157        }
158        let _ = writeln!(s, "maxSegmentDuration=\"PT{:.3}S\">", self.max_seg_secs());
159
160        s.push_str("  <Period id=\"0\" start=\"PT0S\">\n");
161        s.push_str("    <AdaptationSet mimeType=\"video/mp4\" segmentAlignment=\"true\" startWithSAP=\"1\">\n");
162        let _ = write!(
163            s,
164            "      <Representation id=\"v0\" bandwidth=\"{}\"",
165            self.bandwidth
166        );
167        if self.width > 0 && self.height > 0 {
168            let _ = write!(s, " width=\"{}\" height=\"{}\"", self.width, self.height);
169        }
170        if let Some(codecs) = &self.codecs {
171            let _ = write!(s, " codecs=\"{codecs}\"");
172        }
173        s.push_str(">\n");
174
175        let _ = write!(
176            s,
177            "        <SegmentTemplate timescale=\"{}\" initialization=\"{}\" media=\"{}\" startNumber=\"{}\"",
178            self.timescale,
179            self.init_uri,
180            self.media_template,
181            self.start_number()
182        );
183        if self.low_latency {
184            let _ = write!(
185                s,
186                " availabilityTimeOffset=\"{:.3}\" availabilityTimeComplete=\"false\"",
187                (self.max_seg_secs() - self.part_target).max(0.0)
188            );
189        }
190        s.push_str(">\n");
191        s.push_str("          <SegmentTimeline>\n");
192        // Emit an explicit <S> per segment; `t` only needs restating when the
193        // timeline is non-contiguous, but stating it on the first entry anchors
194        // the window after eviction.
195        for (i, (_, t)) in self.segments.iter().enumerate() {
196            if i == 0 {
197                let _ = writeln!(
198                    s,
199                    "            <S t=\"{}\" d=\"{}\"/>",
200                    t.start_ms, t.dur_ms
201                );
202            } else {
203                let _ = writeln!(s, "            <S d=\"{}\"/>", t.dur_ms);
204            }
205        }
206        s.push_str("          </SegmentTimeline>\n");
207        s.push_str("        </SegmentTemplate>\n");
208        s.push_str("      </Representation>\n");
209        s.push_str("    </AdaptationSet>\n");
210        s.push_str("  </Period>\n");
211        s.push_str("</MPD>\n");
212        s
213    }
214}
215
216/// Convert Unix seconds to an ISO-8601 UTC timestamp (`YYYY-MM-DDThh:mm:ssZ`),
217/// dependency-free (Howard Hinnant's civil-from-days algorithm).
218fn unix_to_iso8601(secs: u64) -> String {
219    let days = (secs / 86_400) as i64;
220    let rem = secs % 86_400;
221    let (h, m, sec) = (rem / 3600, (rem % 3600) / 60, rem % 60);
222
223    let z = days + 719_468;
224    let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
225    let doe = z - era * 146_097;
226    let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
227    let y = yoe + era * 400;
228    let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
229    let mp = (5 * doy + 2) / 153;
230    let d = doy - (153 * mp + 2) / 5 + 1;
231    let month = if mp < 10 { mp + 3 } else { mp - 9 };
232    let year = if month <= 2 { y + 1 } else { y };
233    format!("{year:04}-{month:02}-{d:02}T{h:02}:{m:02}:{sec:02}Z")
234}
235
236/// Keyframe-boundary DASH packager writing CMAF segments + a live MPD through a
237/// [`StorageBackend`]. The DASH counterpart to
238/// [`HlsSegmenter`](super::HlsSegmenter).
239pub struct DashPackager<M: Muxer, S: StorageBackend> {
240    engine: SegmentEngine<M, S>,
241    manifest: DashManifest,
242    seg_start_ms: u64,
243}
244
245impl<M: Muxer, S: StorageBackend> DashPackager<M, S> {
246    /// New packager writing under `prefix`, `target_duration`-second segments,
247    /// a `window`-segment live MPD.
248    pub fn new(
249        muxer: M,
250        storage: S,
251        prefix: impl Into<String>,
252        target_duration: u64,
253        window: usize,
254    ) -> Self {
255        Self {
256            engine: SegmentEngine::new(muxer, storage, prefix, target_duration),
257            manifest: DashManifest::new(window),
258            seg_start_ms: 0,
259        }
260    }
261
262    /// Enable LL-DASH (`availabilityTimeOffset`) with the given part target.
263    pub fn low_latency(mut self, part_target: f64) -> Self {
264        self.manifest = self.manifest.low_latency(part_target);
265        self
266    }
267
268    /// The storage key of the MPD.
269    pub fn manifest_key(&self) -> String {
270        format!("{}/manifest.mpd", self.engine.prefix)
271    }
272
273    async fn ensure_init(&mut self, frame: &MediaFrame) -> Result<()> {
274        if let Some(uri) = self.engine.ensure_init(frame).await? {
275            self.manifest.set_uris(
276                uri,
277                format!("seg$Number$.{}", self.engine.muxer.extension()),
278            );
279            // Media info for the Representation, once the muxer can report it.
280            self.manifest
281                .set_media_info(self.engine.muxer.codec_string(), 0, 0, BANDWIDTH_HINT);
282        }
283        Ok(())
284    }
285
286    async fn cut(&mut self, duration: f64) -> Result<()> {
287        let bytes = self.engine.muxer.finish_segment()?;
288        if bytes.is_empty() {
289            return Ok(());
290        }
291        let uri = self.engine.segment_uri(self.engine.seq);
292        let key = self.engine.key(&uri);
293        self.engine.storage.put(&key, bytes).await?;
294        let dur_ms = (duration * 1000.0) as u64;
295        self.manifest
296            .push(self.engine.seq, self.seg_start_ms, dur_ms);
297        self.seg_start_ms += dur_ms;
298        self.write_manifest().await?;
299        self.engine.seq += 1;
300        Ok(())
301    }
302
303    async fn write_manifest(&mut self) -> Result<()> {
304        let body = self.manifest.render();
305        self.engine
306            .storage
307            .put(
308                &self.engine.key("manifest.mpd"),
309                Bytes::from(body.into_bytes()),
310            )
311            .await
312    }
313}
314
315#[async_trait]
316impl<M: Muxer, S: StorageBackend> Packager for DashPackager<M, S> {
317    async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
318        self.ensure_init(frame).await?;
319        let decision = self.engine.observe(frame);
320        if decision.skip {
321            return Ok(());
322        }
323        if let Some(duration) = decision.cut_previous {
324            self.cut(duration).await?;
325        }
326        if decision.open_new {
327            self.engine.muxer.start_segment()?;
328        }
329        self.engine.muxer.write(frame)?;
330        Ok(())
331    }
332
333    async fn finish(&mut self) -> Result<()> {
334        if let Some(duration) = self.engine.flush() {
335            self.cut(duration).await?;
336        }
337        self.manifest.finish();
338        self.write_manifest().await
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::packager::PassthroughMuxer;
346    use crate::testing::{video_frame, InMemoryStorage};
347
348    #[test]
349    fn iso8601_known_vector() {
350        // 2026-06-15T11:35:43Z → seconds since epoch.
351        assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
352        assert_eq!(unix_to_iso8601(1_609_459_200), "2021-01-01T00:00:00Z");
353        assert_eq!(
354            unix_to_iso8601(1_609_459_200 + 3661),
355            "2021-01-01T01:01:01Z"
356        );
357    }
358
359    #[test]
360    fn manifest_renders_dynamic_mpd_with_timeline() {
361        let mut m = DashManifest::new(3);
362        m.set_media_info(Some("avc1.42001f".into()), 1280, 720, 2_000_000);
363        m.push(0, 0, 2000);
364        m.push(1, 2000, 1980);
365        let out = m.render();
366        assert!(out.contains("type=\"dynamic\""));
367        assert!(out.contains("availabilityStartTime="));
368        assert!(out.contains("<SegmentTimeline>"));
369        assert!(out.contains("<S t=\"0\" d=\"2000\"/>"));
370        assert!(out.contains("<S d=\"1980\"/>"));
371        assert!(out.contains("codecs=\"avc1.42001f\""));
372        assert!(out.contains("width=\"1280\" height=\"720\""));
373    }
374
375    #[test]
376    fn low_latency_emits_availability_time_offset() {
377        let mut m = DashManifest::new(3).low_latency(0.5);
378        m.push(0, 0, 2000);
379        let out = m.render();
380        assert!(out.contains("availabilityTimeOffset="));
381        assert!(out.contains("availabilityTimeComplete=\"false\""));
382    }
383
384    #[test]
385    fn finish_marks_static() {
386        let mut m = DashManifest::new(3);
387        m.push(0, 0, 2000);
388        m.finish();
389        let out = m.render();
390        assert!(out.contains("type=\"static\""));
391        assert!(!out.contains("minimumUpdatePeriod"));
392    }
393
394    #[tokio::test]
395    async fn packager_writes_segments_and_mpd() {
396        let store = InMemoryStorage::new();
397        let mut dash = DashPackager::new(
398            PassthroughMuxer::new("m4s"),
399            store.clone(),
400            "live/dash",
401            2,
402            5,
403        );
404
405        for i in 0..4 {
406            let pts = i * 1000;
407            dash.push(&video_frame(pts, true)).await.unwrap();
408            dash.push(&video_frame(pts + 500, false)).await.unwrap();
409        }
410        dash.finish().await.unwrap();
411
412        let mpd =
413            String::from_utf8(store.get("live/dash/manifest.mpd").await.unwrap().to_vec()).unwrap();
414        assert!(mpd.contains("<MPD"));
415        assert!(mpd.contains("type=\"static\"")); // finished
416        assert!(mpd.contains("<SegmentTimeline>"));
417        assert!(store.get("live/dash/seg0.m4s").await.is_ok());
418    }
419}