use crate::error::{NetError, NetResult};
use std::collections::VecDeque;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct LlDashChunkerConfig {
pub chunk_duration_secs: f64,
pub chunks_per_segment: u32,
pub timescale: u32,
pub representation_id: String,
pub bandwidth_bps: u64,
pub availability_time_offset: f64,
pub window_size: usize,
}
impl Default for LlDashChunkerConfig {
fn default() -> Self {
let chunk = 0.5_f64;
let chunks_per_seg = 4u32;
let seg_dur = chunk * f64::from(chunks_per_seg);
Self {
chunk_duration_secs: chunk,
chunks_per_segment: chunks_per_seg,
timescale: 90_000,
representation_id: "1".to_owned(),
bandwidth_bps: 2_000_000,
availability_time_offset: seg_dur - chunk, window_size: 10,
}
}
}
impl LlDashChunkerConfig {
#[must_use]
pub fn new(chunk_duration_secs: f64, chunks_per_segment: u32) -> Self {
let seg_dur = chunk_duration_secs * f64::from(chunks_per_segment);
Self {
chunk_duration_secs,
chunks_per_segment,
availability_time_offset: seg_dur - chunk_duration_secs,
..Self::default()
}
}
#[must_use]
pub fn segment_duration_secs(&self) -> f64 {
self.chunk_duration_secs * f64::from(self.chunks_per_segment)
}
#[must_use]
pub fn chunk_duration_ticks(&self) -> u64 {
(self.chunk_duration_secs * f64::from(self.timescale)) as u64
}
}
#[derive(Debug, Clone)]
pub struct DashChunk {
pub segment_number: u64,
pub chunk_index: u32,
pub start_time_ticks: u64,
pub duration_ticks: u64,
pub duration_secs: f64,
pub is_independent: bool,
pub is_last: bool,
pub data: Vec<u8>,
pub produced_at: SystemTime,
pub byte_offset: u64,
}
impl DashChunk {
#[must_use]
pub fn content_range(&self, total_segment_bytes: u64) -> String {
let end = self.byte_offset + self.data.len() as u64;
format!("bytes {}-{}/{}", self.byte_offset, end.saturating_sub(1), total_segment_bytes)
}
#[must_use]
pub fn segment_url_number(&self) -> String {
self.segment_number.to_string()
}
}
#[derive(Debug, Clone)]
pub struct CompletedDashSegment {
pub number: u64,
pub start_time_ticks: u64,
pub duration_ticks: u64,
pub duration_secs: f64,
pub chunks: Vec<DashChunk>,
pub finalized_at: SystemTime,
}
impl CompletedDashSegment {
#[must_use]
pub fn to_timeline_s(&self) -> String {
format!(
"<S t=\"{}\" d=\"{}\"/>",
self.start_time_ticks, self.duration_ticks
)
}
#[must_use]
pub fn total_bytes(&self) -> u64 {
self.chunks.iter().map(|c| c.data.len() as u64).sum()
}
}
#[must_use]
pub fn segment_template_xml(config: &LlDashChunkerConfig) -> String {
format!(
"<SegmentTemplate timescale=\"{ts}\" \
media=\"chunk_$Number$_$Time$.m4s\" \
initialization=\"init.mp4\" \
availabilityTimeOffset=\"{ato:.3}\">\n",
ts = config.timescale,
ato = config.availability_time_offset,
)
}
#[must_use]
pub fn generate_ll_dash_mpd(
config: &LlDashChunkerConfig,
segments: &VecDeque<CompletedDashSegment>,
availability_start: SystemTime,
) -> String {
let ast = format_system_time(availability_start);
let seg_dur = config.segment_duration_secs();
let update_period = config.chunk_duration_secs;
let mut xml = String::with_capacity(2048);
xml.push_str("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
xml.push_str("<MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\"\n");
xml.push_str(" type=\"dynamic\"\n");
xml.push_str(&format!(" minimumUpdatePeriod=\"PT{update_period:.3}S\"\n"));
xml.push_str(&format!(" minBufferTime=\"PT{seg_dur:.1}S\"\n"));
xml.push_str(&format!(" availabilityStartTime=\"{ast}\"\n"));
xml.push_str(" profiles=\"urn:mpeg:dash:profile:isoff-live:2011,urn:mpeg:dash:profile:cmaf:2019\">\n");
xml.push_str(" <ServiceDescription id=\"0\">\n");
xml.push_str(&format!(
" <Latency target=\"{}\" min=\"{}\" max=\"{}\"/>\n",
(seg_dur * 1000.0) as u32,
(seg_dur * 1000.0 * 0.5) as u32,
(seg_dur * 1000.0 * 2.0) as u32,
));
xml.push_str(" <PlaybackRate min=\"0.96\" max=\"1.04\"/>\n");
xml.push_str(" </ServiceDescription>\n");
xml.push_str(" <Period id=\"0\" start=\"PT0S\">\n");
xml.push_str(" <AdaptationSet mimeType=\"video/mp4\" contentType=\"video\">\n");
xml.push_str(&segment_template_xml(config));
xml.push_str(" <SegmentTimeline>\n");
for seg in segments {
xml.push_str(&format!(" {}\n", seg.to_timeline_s()));
}
xml.push_str(" </SegmentTimeline>\n");
xml.push_str(" </SegmentTemplate>\n");
xml.push_str(&format!(
" <Representation id=\"{}\" bandwidth=\"{}\" width=\"1920\" height=\"1080\"/>\n",
config.representation_id, config.bandwidth_bps
));
xml.push_str(" </AdaptationSet>\n");
xml.push_str(" </Period>\n");
xml.push_str("</MPD>\n");
xml
}
fn format_system_time(t: SystemTime) -> String {
match t.duration_since(SystemTime::UNIX_EPOCH) {
Ok(d) => {
let secs = d.as_secs();
let days = secs / 86400;
let rem = secs % 86400;
let h = rem / 3600;
let m = (rem % 3600) / 60;
let s = rem % 60;
let y = 1970 + days / 365;
let doy = days % 365;
let mo = doy / 30 + 1;
let day = doy % 30 + 1;
format!("{y:04}-{mo:02}-{day:02}T{h:02}:{m:02}:{s:02}Z")
}
Err(_) => "1970-01-01T00:00:00Z".to_owned(),
}
}
#[derive(Debug, Default)]
struct ChunkAccumulator {
data: Vec<u8>,
duration_ms: u64,
has_keyframe: bool,
frame_count: u32,
}
impl ChunkAccumulator {
fn push(&mut self, data: &[u8], duration_ms: u64, is_keyframe: bool) {
self.data.extend_from_slice(data);
self.duration_ms += duration_ms;
self.frame_count += 1;
if is_keyframe {
self.has_keyframe = true;
}
}
fn reset(&mut self) {
self.data.clear();
self.duration_ms = 0;
self.has_keyframe = false;
self.frame_count = 0;
}
fn is_empty(&self) -> bool {
self.frame_count == 0
}
}
pub struct LlDashChunker {
config: LlDashChunkerConfig,
segment_number: u64,
chunk_index: u32,
current_time_ticks: u64,
current_byte_offset: u64,
accumulator: ChunkAccumulator,
current_segment_chunks: Vec<DashChunk>,
ready_chunks: VecDeque<DashChunk>,
completed: VecDeque<CompletedDashSegment>,
availability_start: SystemTime,
last_pts_ms: Option<u64>,
default_frame_duration_ms: u64,
}
impl LlDashChunker {
#[must_use]
pub fn new(config: LlDashChunkerConfig) -> Self {
Self {
config,
segment_number: 1,
chunk_index: 0,
current_time_ticks: 0,
current_byte_offset: 0,
accumulator: ChunkAccumulator::default(),
current_segment_chunks: Vec::new(),
ready_chunks: VecDeque::new(),
completed: VecDeque::new(),
availability_start: SystemTime::now(),
last_pts_ms: None,
default_frame_duration_ms: 33,
}
}
#[must_use]
pub fn default_chunker() -> Self {
Self::new(LlDashChunkerConfig::default())
}
#[must_use]
pub fn current_segment_number(&self) -> u64 {
self.segment_number
}
#[must_use]
pub fn current_chunk_index(&self) -> u32 {
self.chunk_index
}
#[must_use]
pub fn completed_segment_count(&self) -> usize {
self.completed.len()
}
#[must_use]
pub fn ready_chunk_count(&self) -> usize {
self.ready_chunks.len()
}
pub fn push_frame(&mut self, data: &[u8], pts_ms: u64, is_keyframe: bool) {
let dur_ms = match self.last_pts_ms {
Some(prev) => pts_ms.saturating_sub(prev).max(1),
None => self.default_frame_duration_ms,
};
self.last_pts_ms = Some(pts_ms);
if is_keyframe && !self.accumulator.is_empty() {
self.flush_chunk();
}
self.accumulator.push(data, dur_ms, is_keyframe);
let target_ms = (self.config.chunk_duration_secs * 1000.0) as u64;
if self.accumulator.duration_ms >= target_ms {
self.flush_chunk();
}
}
pub fn flush(&mut self) {
if !self.accumulator.is_empty() {
self.flush_chunk();
}
}
pub fn drain_chunks(&mut self) -> Vec<DashChunk> {
self.ready_chunks.drain(..).collect()
}
pub fn take_completed_segment(&mut self) -> Option<CompletedDashSegment> {
self.completed.pop_front()
}
#[must_use]
pub fn completed_segments(&self) -> &VecDeque<CompletedDashSegment> {
&self.completed
}
#[must_use]
pub fn generate_mpd(&self) -> String {
generate_ll_dash_mpd(&self.config, &self.completed, self.availability_start)
}
#[must_use]
pub fn availability_time_offset(&self) -> f64 {
self.config.availability_time_offset
}
fn flush_chunk(&mut self) {
if self.accumulator.is_empty() {
return;
}
let dur_ticks = (self.accumulator.duration_ms as f64
/ 1000.0
* f64::from(self.config.timescale)) as u64;
let dur_secs = self.accumulator.duration_ms as f64 / 1000.0;
let is_last = self.chunk_index + 1 >= self.config.chunks_per_segment;
let byte_offset = self.current_byte_offset;
let chunk = DashChunk {
segment_number: self.segment_number,
chunk_index: self.chunk_index,
start_time_ticks: self.current_time_ticks,
duration_ticks: dur_ticks,
duration_secs: dur_secs,
is_independent: self.accumulator.has_keyframe,
is_last,
data: std::mem::take(&mut self.accumulator.data),
produced_at: SystemTime::now(),
byte_offset,
};
let chunk_size = chunk.data.len() as u64;
self.current_byte_offset += chunk_size;
self.current_time_ticks += dur_ticks;
self.accumulator.reset();
self.chunk_index += 1;
self.current_segment_chunks.push(chunk.clone());
self.ready_chunks.push_back(chunk);
if is_last {
self.finalize_segment();
}
}
fn finalize_segment(&mut self) {
let total_dur_ticks: u64 = self.current_segment_chunks.iter().map(|c| c.duration_ticks).sum();
let total_dur_secs: f64 = self.current_segment_chunks.iter().map(|c| c.duration_secs).sum();
let start = self
.current_segment_chunks
.first()
.map(|c| c.start_time_ticks)
.unwrap_or(0);
let seg = CompletedDashSegment {
number: self.segment_number,
start_time_ticks: start,
duration_ticks: total_dur_ticks,
duration_secs: total_dur_secs,
chunks: std::mem::take(&mut self.current_segment_chunks),
finalized_at: SystemTime::now(),
};
self.completed.push_back(seg);
while self.completed.len() > self.config.window_size {
self.completed.pop_front();
}
self.segment_number += 1;
self.chunk_index = 0;
self.current_byte_offset = 0;
}
}
impl std::fmt::Debug for LlDashChunker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LlDashChunker")
.field("segment_number", &self.segment_number)
.field("chunk_index", &self.chunk_index)
.field("ready_chunks", &self.ready_chunks.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn default_chunker() -> LlDashChunker {
LlDashChunker::new(LlDashChunkerConfig::new(0.1, 3))
}
fn push_frames(chunker: &mut LlDashChunker, count: usize) {
for i in 0..count {
let pts = i as u64 * 33;
chunker.push_frame(&[0u8; 512], pts, i == 0);
}
}
#[test]
fn test_config_default() {
let cfg = LlDashChunkerConfig::default();
assert!((cfg.chunk_duration_secs - 0.5).abs() < 1e-9);
assert_eq!(cfg.chunks_per_segment, 4);
}
#[test]
fn test_config_segment_duration() {
let cfg = LlDashChunkerConfig::new(0.5, 4);
assert!((cfg.segment_duration_secs() - 2.0).abs() < 1e-9);
}
#[test]
fn test_config_ato() {
let cfg = LlDashChunkerConfig::new(0.5, 4);
assert!((cfg.availability_time_offset - 1.5).abs() < 1e-9);
}
#[test]
fn test_chunk_duration_ticks() {
let cfg = LlDashChunkerConfig::default();
let ticks = cfg.chunk_duration_ticks();
assert_eq!(ticks, (0.5 * 90_000.0) as u64);
}
#[test]
fn test_chunker_initial_state() {
let c = default_chunker();
assert_eq!(c.current_segment_number(), 1);
assert_eq!(c.current_chunk_index(), 0);
assert_eq!(c.ready_chunk_count(), 0);
}
#[test]
fn test_push_frame_no_immediate_chunk() {
let mut c = default_chunker();
c.push_frame(&[0u8; 512], 0, true);
assert_eq!(c.ready_chunk_count(), 0);
}
#[test]
fn test_frames_produce_chunk() {
let mut c = default_chunker(); for i in 0..5u64 {
c.push_frame(&[0u8; 256], i * 33, i == 0);
}
let chunks = c.drain_chunks();
assert!(!chunks.is_empty());
}
#[test]
fn test_keyframe_forces_chunk_boundary() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(5.0, 2)); c.push_frame(&[0u8; 256], 0, false);
c.push_frame(&[0u8; 256], 33, true); let chunks = c.drain_chunks();
assert_eq!(chunks.len(), 1);
}
#[test]
fn test_independent_flag() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(5.0, 2));
c.push_frame(&[0u8; 256], 0, true); c.push_frame(&[0u8; 256], 33, false);
c.push_frame(&[0u8; 256], 66, true); let chunks = c.drain_chunks();
if let Some(first) = chunks.first() {
assert!(first.is_independent);
}
}
#[test]
fn test_chunk_segment_number() {
let mut c = default_chunker();
push_frames(&mut c, 5);
let chunks = c.drain_chunks();
for ch in &chunks {
assert_eq!(ch.segment_number, 1);
}
}
#[test]
fn test_chunk_indices_sequential() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(0.1, 5));
for i in 0..8u64 {
c.push_frame(&[0u8; 256], i * 33, i % 3 == 0);
}
let chunks = c.drain_chunks();
for (expected, ch) in chunks.iter().enumerate() {
assert_eq!(ch.chunk_index, expected as u32);
}
}
#[test]
fn test_full_segment_produced() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(0.1, 3));
for i in 0..12u64 {
c.push_frame(&[0u8; 256], i * 20, i % 4 == 0);
}
let seg = c.take_completed_segment();
assert!(seg.is_some());
let s = seg.expect("should have segment");
assert_eq!(s.number, 1);
assert_eq!(s.chunks.len(), 3);
}
#[test]
fn test_segment_number_increments() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(0.1, 2));
for i in 0..12u64 {
c.push_frame(&[0u8; 256], i * 20, i % 3 == 0);
}
assert!(c.current_segment_number() >= 2);
}
#[test]
fn test_window_size_limit() {
let mut cfg = LlDashChunkerConfig::new(0.1, 2);
cfg.window_size = 2;
let mut c = LlDashChunker::new(cfg);
for i in 0..20u64 {
c.push_frame(&[0u8; 256], i * 20, i % 3 == 0);
}
assert!(c.completed_segment_count() <= 2);
}
#[test]
fn test_explicit_flush() {
let mut c = default_chunker();
c.push_frame(&[0u8; 256], 0, true);
c.flush();
assert!(c.ready_chunk_count() > 0 || c.current_chunk_index() > 0);
}
#[test]
fn test_content_range_header() {
let chunk = DashChunk {
segment_number: 1,
chunk_index: 0,
start_time_ticks: 0,
duration_ticks: 45000,
duration_secs: 0.5,
is_independent: true,
is_last: false,
data: vec![0u8; 1024],
produced_at: SystemTime::now(),
byte_offset: 0,
};
let range = chunk.content_range(10240);
assert!(range.starts_with("bytes 0-"));
assert!(range.contains("/10240"));
}
#[test]
fn test_segment_url_number() {
let chunk = DashChunk {
segment_number: 42,
chunk_index: 0,
start_time_ticks: 0,
duration_ticks: 0,
duration_secs: 0.0,
is_independent: false,
is_last: false,
data: vec![],
produced_at: SystemTime::now(),
byte_offset: 0,
};
assert_eq!(chunk.segment_url_number(), "42");
}
#[test]
fn test_timeline_s_element() {
let seg = CompletedDashSegment {
number: 1,
start_time_ticks: 90000,
duration_ticks: 180000,
duration_secs: 2.0,
chunks: vec![],
finalized_at: SystemTime::now(),
};
let s = seg.to_timeline_s();
assert!(s.contains("t=\"90000\""));
assert!(s.contains("d=\"180000\""));
}
#[test]
fn test_segment_total_bytes() {
let chunk = DashChunk {
segment_number: 1,
chunk_index: 0,
start_time_ticks: 0,
duration_ticks: 45000,
duration_secs: 0.5,
is_independent: true,
is_last: true,
data: vec![0u8; 2048],
produced_at: SystemTime::now(),
byte_offset: 0,
};
let seg = CompletedDashSegment {
number: 1,
start_time_ticks: 0,
duration_ticks: 45000,
duration_secs: 0.5,
chunks: vec![chunk],
finalized_at: SystemTime::now(),
};
assert_eq!(seg.total_bytes(), 2048);
}
#[test]
fn test_mpd_contains_ato() {
let mut cfg = LlDashChunkerConfig::default();
cfg.availability_time_offset = 1.5;
let mut c = LlDashChunker::new(cfg);
for i in 0..8u64 {
c.push_frame(&[0u8; 256], i * 20, i % 4 == 0);
}
let mpd = c.generate_mpd();
assert!(mpd.contains("availabilityTimeOffset=\"1.500\""));
}
#[test]
fn test_mpd_xml_wellformed() {
let mut c = default_chunker();
for i in 0..8u64 {
c.push_frame(&[0u8; 256], i * 20, i % 4 == 0);
}
let mpd = c.generate_mpd();
assert!(mpd.contains("<?xml"));
assert!(mpd.contains("MPD"));
assert!(mpd.contains("type=\"dynamic\""));
assert!(mpd.contains("ServiceDescription"));
assert!(mpd.contains("SegmentTimeline"));
}
#[test]
fn test_segment_template_xml() {
let mut cfg = LlDashChunkerConfig::default();
cfg.availability_time_offset = 0.5;
let xml = segment_template_xml(&cfg);
assert!(xml.contains("timescale=\"90000\""));
assert!(xml.contains("availabilityTimeOffset=\"0.500\""));
}
#[test]
fn test_debug_format() {
let c = default_chunker();
let dbg = format!("{c:?}");
assert!(dbg.contains("LlDashChunker"));
}
#[test]
fn test_byte_offset_monotonic() {
let mut c = LlDashChunker::new(LlDashChunkerConfig::new(0.1, 5));
for i in 0..6u64 {
c.push_frame(&[1u8; 512], i * 20, i % 2 == 0);
}
let chunks = c.drain_chunks();
if chunks.len() > 1 {
for w in chunks.windows(2) {
assert!(
w[1].byte_offset >= w[0].byte_offset,
"byte offsets should increase"
);
}
}
}
#[test]
fn test_ato_accessor() {
let mut cfg = LlDashChunkerConfig::default();
cfg.availability_time_offset = 1.23;
let c = LlDashChunker::new(cfg);
assert!((c.availability_time_offset() - 1.23).abs() < 1e-9);
}
}