use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
#[derive(Debug, Clone)]
pub struct LlDashStreamConfig {
pub segment_duration_ms: u64,
pub chunk_duration_ms: u64,
pub availability_time_offset_ms: u64,
pub service_location: String,
pub target_latency_ms: u64,
}
impl LlDashStreamConfig {
#[must_use]
pub fn default_config() -> Self {
Self {
segment_duration_ms: 2000,
chunk_duration_ms: 500,
availability_time_offset_ms: 1800,
service_location: String::new(),
target_latency_ms: 3000,
}
}
#[must_use]
pub fn ultra_low_latency() -> Self {
Self {
segment_duration_ms: 1000,
chunk_duration_ms: 250,
availability_time_offset_ms: 750,
service_location: String::new(),
target_latency_ms: 1500,
}
}
pub fn validate(&self) -> Result<(), String> {
if self.chunk_duration_ms == 0 {
return Err("chunk_duration_ms must be greater than zero".to_owned());
}
if self.segment_duration_ms % self.chunk_duration_ms != 0 {
return Err(format!(
"chunk_duration_ms ({}) must evenly divide segment_duration_ms ({})",
self.chunk_duration_ms, self.segment_duration_ms
));
}
if self.availability_time_offset_ms >= self.segment_duration_ms {
return Err(format!(
"availability_time_offset_ms ({}) must be less than segment_duration_ms ({})",
self.availability_time_offset_ms, self.segment_duration_ms
));
}
Ok(())
}
#[must_use]
pub fn expected_chunks_per_segment(&self) -> u32 {
if self.chunk_duration_ms == 0 {
return 1;
}
(self.segment_duration_ms / self.chunk_duration_ms) as u32
}
}
#[derive(Debug, Clone)]
pub struct DashChunk {
pub segment_index: u64,
pub chunk_index: u32,
pub data: Vec<u8>,
pub is_final: bool,
pub duration_ms: u64,
}
impl DashChunk {
#[must_use]
pub fn new(segment_index: u64, chunk_index: u32, data: Vec<u8>, duration_ms: u64) -> Self {
Self {
segment_index,
chunk_index,
data,
is_final: false,
duration_ms,
}
}
#[must_use]
pub fn mark_final(mut self) -> Self {
self.is_final = true;
self
}
}
pub struct ChunkedSegmentAssembler {
config: LlDashStreamConfig,
segments: HashMap<u64, Vec<DashChunk>>,
}
impl ChunkedSegmentAssembler {
#[must_use]
pub fn new(config: LlDashStreamConfig) -> Self {
Self {
config,
segments: HashMap::new(),
}
}
pub fn add_chunk(&mut self, chunk: DashChunk) -> bool {
let seg_idx = chunk.segment_index;
let is_final = chunk.is_final;
let entry = self.segments.entry(seg_idx).or_default();
entry.push(chunk);
let expected = self.config.expected_chunks_per_segment() as usize;
is_final || entry.len() >= expected
}
#[must_use]
pub fn is_segment_complete(&self, segment_index: u64) -> bool {
let expected = self.config.expected_chunks_per_segment() as usize;
match self.segments.get(&segment_index) {
None => false,
Some(chunks) => chunks.len() >= expected || chunks.iter().any(|c| c.is_final),
}
}
#[must_use]
pub fn get_segment_data(&self, segment_index: u64) -> Option<Vec<u8>> {
let chunks = self.segments.get(&segment_index)?;
if chunks.is_empty() {
return None;
}
let mut ordered: Vec<&DashChunk> = chunks.iter().collect();
ordered.sort_by_key(|c| c.chunk_index);
let total: usize = ordered.iter().map(|c| c.data.len()).sum();
let mut buf = Vec::with_capacity(total);
for c in ordered {
buf.extend_from_slice(&c.data);
}
Some(buf)
}
#[must_use]
pub fn chunks_received(&self, segment_index: u64) -> usize {
self.segments.get(&segment_index).map_or(0, Vec::len)
}
#[must_use]
pub fn expected_chunks_per_segment(&self) -> u32 {
self.config.expected_chunks_per_segment()
}
}
#[derive(Debug, Clone)]
pub struct DashRepresentation {
pub id: String,
pub bandwidth_bps: u64,
pub width: u32,
pub height: u32,
pub codecs: String,
}
#[must_use]
pub fn generate_ll_dash_mpd(
config: &LlDashStreamConfig,
duration_secs: f64,
representations: &[DashRepresentation],
) -> String {
let mut xml = String::with_capacity(2048);
let ato_secs = config.availability_time_offset_ms as f64 / 1000.0;
let target_latency_ms = config.target_latency_ms;
let seg_dur_secs = config.segment_duration_ms as f64 / 1000.0;
let chunk_dur_secs = config.chunk_duration_ms as f64 / 1000.0;
xml.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
xml.push_str("<MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\"\n");
xml.push_str(" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n");
xml.push_str(" type=\"dynamic\"\n");
let _ = writeln!(xml, " minimumUpdatePeriod=\"PT{chunk_dur_secs:.3}S\"");
let _ = writeln!(xml, " minBufferTime=\"PT{seg_dur_secs:.3}S\"");
let _ = writeln!(
xml,
" suggestedPresentationDelay=\"PT{:.3}S\"",
target_latency_ms as f64 / 1000.0
);
if duration_secs > 0.0 {
let _ = writeln!(
xml,
" mediaPresentationDuration=\"PT{duration_secs:.3}S\""
);
}
xml.push_str(
" profiles=\"urn:mpeg:dash:profile:isoff-live:2011,urn:mpeg:dash:profile:cmaf:2019\">\n",
);
xml.push_str(" <ServiceDescription id=\"0\">\n");
let _ = writeln!(
xml,
" <Latency target=\"{target_latency_ms}\" min=\"{}\" max=\"{}\"/>",
(target_latency_ms as f64 * 0.5) as u64,
target_latency_ms * 3
);
xml.push_str(" <PlaybackRate min=\"0.96\" max=\"1.04\"/>\n");
xml.push_str(" </ServiceDescription>\n");
xml.push_str(" <Period id=\"0\" start=\"PT0S\">\n");
xml.push_str(" <AdaptationSet mimeType=\"video/mp4\" contentType=\"video\">\n");
let _ = writeln!(
xml,
" <SegmentTemplate timescale=\"90000\"\n media=\"segment_$Number$.m4s\"\n initialization=\"init.mp4\"\n availabilityTimeOffset=\"{ato_secs:.3}\">"
);
xml.push_str(" <SegmentTimeline/>\n");
xml.push_str(" </SegmentTemplate>\n");
for rep in representations {
let _ = writeln!(
xml,
" <Representation id=\"{}\" bandwidth=\"{}\" width=\"{}\" height=\"{}\" codecs=\"{}\"/>",
rep.id, rep.bandwidth_bps, rep.width, rep.height, rep.codecs
);
}
xml.push_str(" </AdaptationSet>\n");
xml.push_str(" </Period>\n");
xml.push_str("</MPD>\n");
xml
}
#[cfg(test)]
mod tests {
use super::*;
fn default_cfg() -> LlDashStreamConfig {
LlDashStreamConfig::default_config()
}
#[test]
fn test_validate_bad_ratio() {
let mut cfg = default_cfg();
cfg.chunk_duration_ms = 300; assert!(cfg.validate().is_err());
}
#[test]
fn test_validate_ato_too_large() {
let mut cfg = default_cfg();
cfg.availability_time_offset_ms = 2000; assert!(cfg.validate().is_err());
}
#[test]
fn test_validate_zero_chunk() {
let mut cfg = default_cfg();
cfg.chunk_duration_ms = 0;
assert!(cfg.validate().is_err());
}
#[test]
fn test_expected_chunks_per_segment() {
let cfg = default_cfg(); assert_eq!(cfg.expected_chunks_per_segment(), 4);
}
#[test]
fn test_ull_chunks_per_segment() {
let cfg = LlDashStreamConfig::ultra_low_latency(); assert_eq!(cfg.expected_chunks_per_segment(), 4);
}
#[test]
fn test_add_chunk_completes_on_final() {
let cfg = default_cfg();
let mut asm = ChunkedSegmentAssembler::new(cfg);
let chunk = DashChunk::new(0, 0, vec![1, 2, 3], 500).mark_final();
let done = asm.add_chunk(chunk);
assert!(done);
}
#[test]
fn test_add_chunk_completes_on_count() {
let cfg = default_cfg(); let mut asm = ChunkedSegmentAssembler::new(cfg);
for i in 0..3u32 {
let done = asm.add_chunk(DashChunk::new(1, i, vec![i as u8], 500));
assert!(!done, "should not be done after chunk {i}");
}
let done = asm.add_chunk(DashChunk::new(1, 3, vec![3], 500));
assert!(done);
}
#[test]
fn test_get_segment_data_assembles() {
let cfg = default_cfg();
let mut asm = ChunkedSegmentAssembler::new(cfg);
asm.add_chunk(DashChunk::new(2, 1, vec![0xBB], 500));
asm.add_chunk(DashChunk::new(2, 0, vec![0xAA], 500));
let data = asm.get_segment_data(2).expect("should have data");
assert_eq!(data, vec![0xAA, 0xBB]);
}
#[test]
fn test_is_segment_complete() {
let cfg = default_cfg();
let mut asm = ChunkedSegmentAssembler::new(cfg);
assert!(!asm.is_segment_complete(0));
for i in 0..4u32 {
asm.add_chunk(DashChunk::new(0, i, vec![i as u8], 500));
}
assert!(asm.is_segment_complete(0));
}
#[test]
fn test_chunks_received() {
let cfg = default_cfg();
let mut asm = ChunkedSegmentAssembler::new(cfg);
assert_eq!(asm.chunks_received(5), 0);
asm.add_chunk(DashChunk::new(5, 0, vec![1], 500));
asm.add_chunk(DashChunk::new(5, 1, vec![2], 500));
assert_eq!(asm.chunks_received(5), 2);
}
#[test]
fn test_multiple_segments() {
let cfg = default_cfg();
let mut asm = ChunkedSegmentAssembler::new(cfg);
asm.add_chunk(DashChunk::new(0, 0, vec![0xAA], 500));
asm.add_chunk(DashChunk::new(1, 0, vec![0xBB], 500));
assert_eq!(asm.chunks_received(0), 1);
assert_eq!(asm.chunks_received(1), 1);
}
#[test]
fn test_mpd_contains_ato() {
let cfg = default_cfg(); let reps = vec![DashRepresentation {
id: "1".to_owned(),
bandwidth_bps: 2_000_000,
width: 1920,
height: 1080,
codecs: "avc1.640028".to_owned(),
}];
let mpd = generate_ll_dash_mpd(&cfg, 0.0, &reps);
assert!(mpd.contains("availabilityTimeOffset=\"1.800\""));
}
#[test]
fn test_mpd_contains_service_description() {
let cfg = default_cfg();
let mpd = generate_ll_dash_mpd(&cfg, 0.0, &[]);
assert!(mpd.contains("ServiceDescription"));
assert!(mpd.contains("Latency"));
assert!(mpd.contains("PlaybackRate"));
}
#[test]
fn test_chunk_mark_final() {
let c = DashChunk::new(0, 0, vec![], 500).mark_final();
assert!(c.is_final);
}
#[test]
fn test_assembler_expected_chunks() {
let cfg = LlDashStreamConfig::ultra_low_latency();
let asm = ChunkedSegmentAssembler::new(cfg);
assert_eq!(asm.expected_chunks_per_segment(), 4);
}
}