use std::collections::VecDeque;
use std::fmt::Write as _;
use std::time::{SystemTime, UNIX_EPOCH};
use super::engine::{SegmentEngine, BANDWIDTH_HINT};
use super::{Muxer, Packager};
use crate::traits::StorageBackend;
use crate::{MediaFrame, Result};
use async_trait::async_trait;
use bytes::Bytes;
#[derive(Debug, Clone, Copy)]
struct TimelineEntry {
start_ms: u64,
dur_ms: u64,
}
#[derive(Debug, Clone)]
pub struct DashManifest {
window: usize,
timescale: u64,
low_latency: bool,
part_target: f64,
availability_start: u64, segments: VecDeque<(u64, TimelineEntry)>, codecs: Option<String>,
width: u16,
height: u16,
bandwidth: u32,
init_uri: String,
media_template: String,
finished: bool,
}
impl DashManifest {
pub fn new(window: usize) -> Self {
Self {
window: window.max(1),
timescale: 1000,
low_latency: false,
part_target: 0.0,
availability_start: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
segments: VecDeque::new(),
codecs: None,
width: 0,
height: 0,
bandwidth: 0,
init_uri: "init.m4s".into(),
media_template: "seg$Number$.m4s".into(),
finished: false,
}
}
pub fn low_latency(mut self, part_target: f64) -> Self {
self.low_latency = true;
self.part_target = part_target.max(0.05);
self
}
pub fn set_media_info(
&mut self,
codecs: Option<String>,
width: u16,
height: u16,
bandwidth: u32,
) {
self.codecs = codecs;
self.width = width;
self.height = height;
self.bandwidth = bandwidth;
}
pub fn set_uris(&mut self, init_uri: impl Into<String>, media_template: impl Into<String>) {
self.init_uri = init_uri.into();
self.media_template = media_template.into();
}
pub fn push(&mut self, seq: u64, start_ms: u64, dur_ms: u64) {
self.segments
.push_back((seq, TimelineEntry { start_ms, dur_ms }));
while self.segments.len() > self.window {
self.segments.pop_front();
}
}
pub fn finish(&mut self) {
self.finished = true;
}
fn start_number(&self) -> u64 {
self.segments.front().map(|(seq, _)| *seq).unwrap_or(0)
}
fn max_seg_secs(&self) -> f64 {
self.segments
.iter()
.map(|(_, t)| t.dur_ms)
.max()
.unwrap_or(0) as f64
/ 1000.0
}
pub fn render(&self) -> String {
let mut s = String::with_capacity(512 + self.segments.len() * 48);
s.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
let mpd_type = if self.finished { "static" } else { "dynamic" };
let profile = "urn:mpeg:dash:profile:isoff-live:2011";
s.push_str("<MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\" ");
let _ = write!(s, "type=\"{mpd_type}\" profiles=\"{profile}\" ");
s.push_str("minBufferTime=\"PT1.5S\" ");
if !self.finished {
let _ = write!(
s,
"availabilityStartTime=\"{}\" ",
unix_to_iso8601(self.availability_start)
);
s.push_str("minimumUpdatePeriod=\"PT1S\" ");
}
let _ = writeln!(s, "maxSegmentDuration=\"PT{:.3}S\">", self.max_seg_secs());
s.push_str(" <Period id=\"0\" start=\"PT0S\">\n");
s.push_str(" <AdaptationSet mimeType=\"video/mp4\" segmentAlignment=\"true\" startWithSAP=\"1\">\n");
let _ = write!(
s,
" <Representation id=\"v0\" bandwidth=\"{}\"",
self.bandwidth
);
if self.width > 0 && self.height > 0 {
let _ = write!(s, " width=\"{}\" height=\"{}\"", self.width, self.height);
}
if let Some(codecs) = &self.codecs {
let _ = write!(s, " codecs=\"{codecs}\"");
}
s.push_str(">\n");
let _ = write!(
s,
" <SegmentTemplate timescale=\"{}\" initialization=\"{}\" media=\"{}\" startNumber=\"{}\"",
self.timescale,
self.init_uri,
self.media_template,
self.start_number()
);
if self.low_latency {
let _ = write!(
s,
" availabilityTimeOffset=\"{:.3}\" availabilityTimeComplete=\"false\"",
(self.max_seg_secs() - self.part_target).max(0.0)
);
}
s.push_str(">\n");
s.push_str(" <SegmentTimeline>\n");
for (i, (_, t)) in self.segments.iter().enumerate() {
if i == 0 {
let _ = writeln!(
s,
" <S t=\"{}\" d=\"{}\"/>",
t.start_ms, t.dur_ms
);
} else {
let _ = writeln!(s, " <S d=\"{}\"/>", t.dur_ms);
}
}
s.push_str(" </SegmentTimeline>\n");
s.push_str(" </SegmentTemplate>\n");
s.push_str(" </Representation>\n");
s.push_str(" </AdaptationSet>\n");
s.push_str(" </Period>\n");
s.push_str("</MPD>\n");
s
}
}
fn unix_to_iso8601(secs: u64) -> String {
let days = (secs / 86_400) as i64;
let rem = secs % 86_400;
let (h, m, sec) = (rem / 3600, (rem % 3600) / 60, rem % 60);
let z = days + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let month = if mp < 10 { mp + 3 } else { mp - 9 };
let year = if month <= 2 { y + 1 } else { y };
format!("{year:04}-{month:02}-{d:02}T{h:02}:{m:02}:{sec:02}Z")
}
pub struct DashPackager<M: Muxer, S: StorageBackend> {
engine: SegmentEngine<M, S>,
manifest: DashManifest,
seg_start_ms: u64,
}
impl<M: Muxer, S: StorageBackend> DashPackager<M, S> {
pub fn new(
muxer: M,
storage: S,
prefix: impl Into<String>,
target_duration: u64,
window: usize,
) -> Self {
Self {
engine: SegmentEngine::new(muxer, storage, prefix, target_duration),
manifest: DashManifest::new(window),
seg_start_ms: 0,
}
}
pub fn low_latency(mut self, part_target: f64) -> Self {
self.manifest = self.manifest.low_latency(part_target);
self
}
pub fn manifest_key(&self) -> String {
format!("{}/manifest.mpd", self.engine.prefix)
}
async fn ensure_init(&mut self, frame: &MediaFrame) -> Result<()> {
if let Some(uri) = self.engine.ensure_init(frame).await? {
self.manifest.set_uris(
uri,
format!("seg$Number$.{}", self.engine.muxer.extension()),
);
self.manifest
.set_media_info(self.engine.muxer.codec_string(), 0, 0, BANDWIDTH_HINT);
}
Ok(())
}
async fn cut(&mut self, duration: f64) -> Result<()> {
let bytes = self.engine.muxer.finish_segment()?;
if bytes.is_empty() {
return Ok(());
}
let uri = self.engine.segment_uri(self.engine.seq);
let key = self.engine.key(&uri);
self.engine.storage.put(&key, bytes).await?;
let dur_ms = (duration * 1000.0) as u64;
self.manifest
.push(self.engine.seq, self.seg_start_ms, dur_ms);
self.seg_start_ms += dur_ms;
self.write_manifest().await?;
self.engine.seq += 1;
Ok(())
}
async fn write_manifest(&mut self) -> Result<()> {
let body = self.manifest.render();
self.engine
.storage
.put(
&self.engine.key("manifest.mpd"),
Bytes::from(body.into_bytes()),
)
.await
}
}
#[async_trait]
impl<M: Muxer, S: StorageBackend> Packager for DashPackager<M, S> {
async fn push(&mut self, frame: &MediaFrame) -> Result<()> {
self.ensure_init(frame).await?;
let decision = self.engine.observe(frame);
if decision.skip {
return Ok(());
}
if let Some(duration) = decision.cut_previous {
self.cut(duration).await?;
}
if decision.open_new {
self.engine.muxer.start_segment()?;
}
self.engine.muxer.write(frame)?;
Ok(())
}
async fn finish(&mut self) -> Result<()> {
if let Some(duration) = self.engine.flush() {
self.cut(duration).await?;
}
self.manifest.finish();
self.write_manifest().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::packager::PassthroughMuxer;
use crate::testing::{video_frame, InMemoryStorage};
#[test]
fn iso8601_known_vector() {
assert_eq!(unix_to_iso8601(0), "1970-01-01T00:00:00Z");
assert_eq!(unix_to_iso8601(1_609_459_200), "2021-01-01T00:00:00Z");
assert_eq!(
unix_to_iso8601(1_609_459_200 + 3661),
"2021-01-01T01:01:01Z"
);
}
#[test]
fn manifest_renders_dynamic_mpd_with_timeline() {
let mut m = DashManifest::new(3);
m.set_media_info(Some("avc1.42001f".into()), 1280, 720, 2_000_000);
m.push(0, 0, 2000);
m.push(1, 2000, 1980);
let out = m.render();
assert!(out.contains("type=\"dynamic\""));
assert!(out.contains("availabilityStartTime="));
assert!(out.contains("<SegmentTimeline>"));
assert!(out.contains("<S t=\"0\" d=\"2000\"/>"));
assert!(out.contains("<S d=\"1980\"/>"));
assert!(out.contains("codecs=\"avc1.42001f\""));
assert!(out.contains("width=\"1280\" height=\"720\""));
}
#[test]
fn low_latency_emits_availability_time_offset() {
let mut m = DashManifest::new(3).low_latency(0.5);
m.push(0, 0, 2000);
let out = m.render();
assert!(out.contains("availabilityTimeOffset="));
assert!(out.contains("availabilityTimeComplete=\"false\""));
}
#[test]
fn finish_marks_static() {
let mut m = DashManifest::new(3);
m.push(0, 0, 2000);
m.finish();
let out = m.render();
assert!(out.contains("type=\"static\""));
assert!(!out.contains("minimumUpdatePeriod"));
}
#[tokio::test]
async fn packager_writes_segments_and_mpd() {
let store = InMemoryStorage::new();
let mut dash = DashPackager::new(
PassthroughMuxer::new("m4s"),
store.clone(),
"live/dash",
2,
5,
);
for i in 0..4 {
let pts = i * 1000;
dash.push(&video_frame(pts, true)).await.unwrap();
dash.push(&video_frame(pts + 500, false)).await.unwrap();
}
dash.finish().await.unwrap();
let mpd =
String::from_utf8(store.get("live/dash/manifest.mpd").await.unwrap().to_vec()).unwrap();
assert!(mpd.contains("<MPD"));
assert!(mpd.contains("type=\"static\"")); assert!(mpd.contains("<SegmentTimeline>"));
assert!(store.get("live/dash/seg0.m4s").await.is_ok());
}
}