mod chunked;
mod dvr;
mod mpd_gen;
mod segment;
mod timeline;
pub use chunked::{Chunk, ChunkCoordinator, ChunkedConfig, ChunkedTransfer, ProducerReferenceTime};
pub use dvr::{DvrBuffer, DvrSegment, DvrStats};
pub use mpd_gen::{AdaptationSetBuilder, DynamicMpdGenerator, MpdConfig, RepresentationBuilder};
pub use segment::{
CodecInfo, GeneratedSegment, LiveSegmentGenerator, MultiRepresentationGenerator,
SegmentAlignment,
};
pub use timeline::{SegmentInfo, TimelineManager};
use crate::dash::mpd::{Representation, SegmentTemplate};
use crate::error::{NetError, NetResult};
use bytes::Bytes;
use oximedia_container::Packet;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct DashLiveConfig {
pub segment_duration: Duration,
pub min_buffer_time: Duration,
pub time_shift_buffer: Duration,
pub low_latency: bool,
}
impl Default for DashLiveConfig {
fn default() -> Self {
Self {
segment_duration: Duration::from_secs(2),
min_buffer_time: Duration::from_secs(4),
time_shift_buffer: Duration::from_secs(60),
low_latency: false,
}
}
}
pub struct DashLiveServer {
config: DashLiveConfig,
state: Arc<RwLock<ServerState>>,
}
struct ServerState {
mpd_generator: DynamicMpdGenerator,
segment_generators: MultiRepresentationGenerator,
timelines: HashMap<String, TimelineManager>,
dvr_buffers: HashMap<String, DvrBuffer>,
chunk_coordinator: Option<ChunkCoordinator>,
availability_start: SystemTime,
current_period_id: String,
representations: HashMap<String, RepresentationMetadata>,
segment_numbers: HashMap<String, u64>,
}
#[derive(Debug, Clone)]
struct RepresentationMetadata {
id: String,
bandwidth: u64,
codec: CodecInfo,
timescale: u32,
adaptation_set_id: u32,
width: Option<u32>,
height: Option<u32>,
}
impl DashLiveServer {
pub async fn start(config: DashLiveConfig) -> NetResult<Self> {
let availability_start = SystemTime::now();
let mpd_config = MpdConfig {
min_buffer_time: config.min_buffer_time,
suggested_presentation_delay: if config.low_latency {
Duration::from_secs(2)
} else {
Duration::from_secs(6)
},
time_shift_buffer_depth: config.time_shift_buffer,
availability_start_time: availability_start,
minimum_update_period: config.segment_duration,
};
let mut mpd_generator = DynamicMpdGenerator::new(mpd_config);
mpd_generator.add_utc_timing(
"urn:mpeg:dash:utc:http-iso:2014".to_string(),
"https://time.akamai.com/?iso".to_string(),
);
let period_id = mpd_generator.add_period(None, None);
let chunk_coordinator = if config.low_latency {
Some(ChunkCoordinator::new(Duration::from_secs(2)))
} else {
None
};
let state = ServerState {
mpd_generator,
segment_generators: MultiRepresentationGenerator::new(),
timelines: HashMap::new(),
dvr_buffers: HashMap::new(),
chunk_coordinator,
availability_start,
current_period_id: period_id,
representations: HashMap::new(),
segment_numbers: HashMap::new(),
};
Ok(Self {
config,
state: Arc::new(RwLock::new(state)),
})
}
pub async fn add_representation(
&mut self,
id: impl Into<String>,
bandwidth: u64,
codec: CodecInfo,
timescale: u32,
width: Option<u32>,
height: Option<u32>,
) -> NetResult<()> {
let id = id.into();
let mut state = self.state.write().await;
let adaptation_set_id = if codec.is_video { 0 } else { 1 };
let segment_generator =
LiveSegmentGenerator::new(&id, timescale, self.config.segment_duration, codec.clone());
state
.segment_generators
.add_representation(segment_generator);
let timeline = TimelineManager::new(
timescale,
self.config.segment_duration,
state.availability_start,
);
state.timelines.insert(id.clone(), timeline);
let dvr_buffer = DvrBuffer::new(self.config.time_shift_buffer);
state.dvr_buffers.insert(id.clone(), dvr_buffer);
if let Some(ref mut coordinator) = state.chunk_coordinator {
coordinator.add_representation(id.clone(), ChunkedConfig::default());
}
let metadata = RepresentationMetadata {
id: id.clone(),
bandwidth,
codec: codec.clone(),
timescale,
adaptation_set_id,
width,
height,
};
state.representations.insert(id.clone(), metadata);
state.segment_numbers.insert(id.clone(), 1);
self.update_mpd_representations(&mut state).await?;
Ok(())
}
pub async fn ingest_packet(
&mut self,
representation_id: impl AsRef<str>,
packet: Packet,
) -> NetResult<()> {
let representation_id = representation_id.as_ref();
let mut state = self.state.write().await;
if let Some(segment) = state
.segment_generators
.add_packet(representation_id, packet)
{
self.handle_completed_segment(&mut state, representation_id, segment)
.await?;
}
Ok(())
}
pub async fn get_mpd(&self) -> String {
let mut state = self.state.write().await;
state.mpd_generator.generate_xml()
}
pub async fn get_segment(
&self,
representation_id: &str,
segment_number: u64,
) -> NetResult<Bytes> {
let state = self.state.read().await;
let dvr_buffer = state.dvr_buffers.get(representation_id).ok_or_else(|| {
NetError::not_found(format!("Representation not found: {representation_id}"))
})?;
let segment = dvr_buffer
.get_segment(segment_number, representation_id)
.ok_or_else(|| NetError::not_found(format!("Segment {segment_number} not found")))?;
Ok(segment.data.clone())
}
pub async fn get_init_segment(&self, representation_id: &str) -> NetResult<Bytes> {
let state = self.state.read().await;
let generator = state
.segment_generators
.generator(representation_id)
.ok_or_else(|| {
NetError::not_found(format!("Representation not found: {representation_id}"))
})?;
generator
.init_segment()
.cloned()
.ok_or_else(|| NetError::not_found("Initialization segment not generated"))
}
pub async fn generate_init_segments(&mut self) -> NetResult<()> {
let mut state = self.state.write().await;
let repr_ids: Vec<String> = state.representations.keys().cloned().collect();
for repr_id in repr_ids {
if let Some(generator) = state.segment_generators.generator_mut(&repr_id) {
generator.generate_init_segment(None);
}
}
Ok(())
}
pub async fn get_chunk(
&self,
representation_id: &str,
_segment_number: u64,
chunk_sequence: u32,
) -> NetResult<Chunk> {
let state = self.state.read().await;
let coordinator = state
.chunk_coordinator
.as_ref()
.ok_or_else(|| NetError::invalid_state("Low-latency mode not enabled"))?;
let transfer = coordinator.transfer(representation_id).ok_or_else(|| {
NetError::not_found(format!("Representation not found: {representation_id}"))
})?;
transfer
.get_chunk(chunk_sequence)
.cloned()
.ok_or_else(|| NetError::not_found(format!("Chunk {chunk_sequence} not found")))
}
#[must_use]
pub async fn stats(&self) -> ServerStats {
let state = self.state.read().await;
let mut representation_stats = Vec::new();
for (repr_id, dvr_buffer) in &state.dvr_buffers {
let timeline = state.timelines.get(repr_id);
let current_segment = state.segment_numbers.get(repr_id).copied().unwrap_or(0);
representation_stats.push(RepresentationStats {
id: repr_id.clone(),
current_segment_number: current_segment,
buffered_segments: dvr_buffer.segment_count(),
buffer_size: dvr_buffer.total_size(),
current_time: timeline.map(|t| t.current_time_secs()).unwrap_or(0.0),
});
}
ServerStats {
uptime: SystemTime::now()
.duration_since(state.availability_start)
.unwrap_or(Duration::ZERO),
representation_count: state.representations.len(),
representations: representation_stats,
low_latency_enabled: state.chunk_coordinator.is_some(),
}
}
pub async fn finalize_segments(&mut self) -> NetResult<()> {
let mut state = self.state.write().await;
let segments = state.segment_generators.finalize_all();
for (repr_id, segment) in segments {
self.handle_completed_segment(&mut state, &repr_id, segment)
.await?;
}
Ok(())
}
async fn handle_completed_segment(
&self,
state: &mut ServerState,
representation_id: &str,
segment: GeneratedSegment,
) -> NetResult<()> {
let metadata = state
.representations
.get(representation_id)
.ok_or_else(|| {
NetError::not_found(format!("Representation not found: {representation_id}"))
})?;
if let Some(timeline) = state.timelines.get_mut(representation_id) {
let duration = Duration::from_secs_f64(segment.duration_secs(metadata.timescale));
timeline.add_segment(duration);
timeline.trim_old_segments(self.config.time_shift_buffer);
state.mpd_generator.update_timeline(
&state.current_period_id,
metadata.adaptation_set_id,
representation_id,
timeline.to_segment_timeline(),
);
}
if let Some(dvr_buffer) = state.dvr_buffers.get_mut(representation_id) {
let dvr_segment = DvrSegment::new(
segment.number,
representation_id,
segment.data.clone(),
Duration::from_secs_f64(segment.start_time_secs(metadata.timescale)),
Duration::from_secs_f64(segment.duration_secs(metadata.timescale)),
metadata.timescale,
);
dvr_buffer.add_segment(dvr_segment);
}
state
.segment_numbers
.insert(representation_id.to_string(), segment.number + 1);
Ok(())
}
async fn update_mpd_representations(&self, state: &mut ServerState) -> NetResult<()> {
let mut video_reprs = Vec::new();
let mut audio_reprs = Vec::new();
for metadata in state.representations.values() {
let mut repr = Representation::new(&metadata.id, metadata.bandwidth);
repr.codecs = Some(metadata.codec.codec.clone());
repr.mime_type = Some(metadata.codec.mime_type.clone());
if metadata.codec.is_video {
repr.width = metadata.width;
repr.height = metadata.height;
}
let template = SegmentTemplate::new(metadata.timescale)
.with_media("$RepresentationID$/$Number$.m4s")
.with_initialization("$RepresentationID$/init.mp4");
repr.segment_template = Some(template);
if metadata.codec.is_video {
video_reprs.push(repr);
} else {
audio_reprs.push(repr);
}
}
if let Some(period) = state.mpd_generator.current_period_mut() {
period.adaptation_sets.clear();
if !video_reprs.is_empty() {
let mut video_as = AdaptationSetBuilder::new()
.id(0)
.content_type("video")
.mime_type("video/mp4")
.segment_alignment(true)
.build();
video_as.representations = video_reprs;
period.adaptation_sets.push(video_as);
}
if !audio_reprs.is_empty() {
let mut audio_as = AdaptationSetBuilder::new()
.id(1)
.content_type("audio")
.mime_type("audio/mp4")
.segment_alignment(true)
.build();
audio_as.representations = audio_reprs;
period.adaptation_sets.push(audio_as);
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ServerStats {
pub uptime: Duration,
pub representation_count: usize,
pub representations: Vec<RepresentationStats>,
pub low_latency_enabled: bool,
}
#[derive(Debug, Clone)]
pub struct RepresentationStats {
pub id: String,
pub current_segment_number: u64,
pub buffered_segments: usize,
pub buffer_size: u64,
pub current_time: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_server_creation() {
let config = DashLiveConfig::default();
let server = DashLiveServer::start(config).await;
assert!(server.is_ok());
}
#[tokio::test]
async fn test_add_representation() {
let config = DashLiveConfig::default();
let mut server = DashLiveServer::start(config)
.await
.expect("should succeed in test");
let codec = CodecInfo::h264(0x4d, 0x40);
let result = server
.add_representation("720p", 1_500_000, codec, 90000, Some(1280), Some(720))
.await;
assert!(result.is_ok());
let stats = server.stats().await;
assert_eq!(stats.representation_count, 1);
}
#[tokio::test]
async fn test_get_mpd() {
let config = DashLiveConfig::default();
let server = DashLiveServer::start(config)
.await
.expect("should succeed in test");
let mpd = server.get_mpd().await;
assert!(mpd.contains("<?xml"));
assert!(mpd.contains("type=\"dynamic\""));
}
#[tokio::test]
async fn test_low_latency_mode() {
let config = DashLiveConfig {
low_latency: true,
..Default::default()
};
let server = DashLiveServer::start(config)
.await
.expect("should succeed in test");
let stats = server.stats().await;
assert!(stats.low_latency_enabled);
}
}