#![forbid(unsafe_code)]
#[cfg(not(target_arch = "wasm32"))]
use async_trait::async_trait;
#[cfg(not(target_arch = "wasm32"))]
use oximedia_core::OxiResult;
use std::time::Duration;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::mpsc;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
use crate::{Muxer, MuxerConfig, Packet, StreamInfo};
#[derive(Clone, Debug)]
pub struct StreamingMuxerConfig {
pub target_latency_ms: u64,
pub low_latency: bool,
pub fragment_duration_ms: Option<u64>,
pub realtime: bool,
}
impl Default for StreamingMuxerConfig {
fn default() -> Self {
Self {
target_latency_ms: 1000,
low_latency: false,
fragment_duration_ms: None,
realtime: false,
}
}
}
impl StreamingMuxerConfig {
#[must_use]
pub const fn new() -> Self {
Self {
target_latency_ms: 1000,
low_latency: false,
fragment_duration_ms: None,
realtime: false,
}
}
#[must_use]
pub const fn with_low_latency(mut self, enabled: bool) -> Self {
self.low_latency = enabled;
self
}
#[must_use]
pub const fn with_target_latency(mut self, latency_ms: u64) -> Self {
self.target_latency_ms = latency_ms;
self
}
#[must_use]
pub const fn with_fragment_duration(mut self, duration_ms: u64) -> Self {
self.fragment_duration_ms = Some(duration_ms);
self
}
#[must_use]
pub const fn with_realtime(mut self, enabled: bool) -> Self {
self.realtime = enabled;
self
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct StreamingMuxer<M: Muxer> {
inner: M,
#[allow(dead_code)]
streaming_config: StreamingMuxerConfig,
packets_written: u64,
bytes_written: u64,
start_time: Option<Instant>,
last_packet_time: Option<Instant>,
}
#[cfg(not(target_arch = "wasm32"))]
impl<M: Muxer> StreamingMuxer<M> {
pub const fn new(inner: M) -> Self {
Self::with_config(inner, StreamingMuxerConfig::new())
}
pub const fn with_config(inner: M, streaming_config: StreamingMuxerConfig) -> Self {
Self {
inner,
streaming_config,
packets_written: 0,
bytes_written: 0,
start_time: None,
last_packet_time: None,
}
}
#[must_use]
pub const fn packets_written(&self) -> u64 {
self.packets_written
}
#[must_use]
pub const fn bytes_written(&self) -> u64 {
self.bytes_written
}
#[must_use]
pub fn elapsed(&self) -> Option<Duration> {
self.start_time.map(|start| start.elapsed())
}
#[must_use]
pub const fn inner(&self) -> &M {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut M {
&mut self.inner
}
#[must_use]
pub fn into_inner(self) -> M {
self.inner
}
}
#[cfg(not(target_arch = "wasm32"))]
#[async_trait]
impl<M: Muxer> Muxer for StreamingMuxer<M> {
fn add_stream(&mut self, info: StreamInfo) -> OxiResult<usize> {
self.inner.add_stream(info)
}
async fn write_header(&mut self) -> OxiResult<()> {
self.start_time = Some(Instant::now());
self.inner.write_header().await
}
async fn write_packet(&mut self, packet: &Packet) -> OxiResult<()> {
let now = Instant::now();
self.last_packet_time = Some(now);
self.packets_written += 1;
self.bytes_written += packet.size() as u64;
self.inner.write_packet(packet).await
}
async fn write_trailer(&mut self) -> OxiResult<()> {
self.inner.write_trailer().await
}
fn streams(&self) -> &[StreamInfo] {
self.inner.streams()
}
fn config(&self) -> &MuxerConfig {
self.inner.config()
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct PacketSender {
tx: mpsc::UnboundedSender<Packet>,
}
#[cfg(not(target_arch = "wasm32"))]
impl PacketSender {
const fn new(tx: mpsc::UnboundedSender<Packet>) -> Self {
Self { tx }
}
pub fn send(&self, packet: Packet) -> Result<(), mpsc::error::SendError<Packet>> {
self.tx.send(packet)
}
pub fn try_send(&self, packet: Packet) -> Result<(), mpsc::error::SendError<Packet>> {
self.tx.send(packet)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn spawn_muxer<M: Muxer + Send + 'static>(mut muxer: M) -> OxiResult<PacketSender> {
muxer.write_header().await?;
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
while let Some(packet) = rx.recv().await {
if muxer.write_packet(&packet).await.is_err() {
break;
}
}
let _ = muxer.write_trailer().await;
});
Ok(PacketSender::new(tx))
}
#[derive(Debug, Clone, Copy, Default)]
pub struct MuxingStats {
pub packets_written: u64,
pub bytes_written: u64,
pub avg_bitrate: f64,
pub current_bitrate: f64,
pub duration_secs: f64,
}
impl MuxingStats {
#[must_use]
pub const fn new() -> Self {
Self {
packets_written: 0,
bytes_written: 0,
avg_bitrate: 0.0,
current_bitrate: 0.0,
duration_secs: 0.0,
}
}
pub fn update(&mut self, packet_size: usize, duration_secs: f64) {
self.packets_written += 1;
self.bytes_written += packet_size as u64;
self.duration_secs = duration_secs;
if duration_secs > 0.0 {
#[allow(clippy::cast_precision_loss)]
{
self.avg_bitrate = (self.bytes_written as f64 * 8.0) / duration_secs;
}
}
}
pub fn set_current_bitrate(&mut self, bitrate: f64) {
self.current_bitrate = bitrate;
}
}
#[derive(Debug)]
pub struct LatencyMonitor {
target_latency: Duration,
measurements: Vec<Duration>,
max_measurements: usize,
}
impl LatencyMonitor {
#[must_use]
pub fn new(target_latency: Duration) -> Self {
Self {
target_latency,
measurements: Vec::with_capacity(100),
max_measurements: 100,
}
}
pub fn record(&mut self, latency: Duration) {
if self.measurements.len() >= self.max_measurements {
self.measurements.remove(0);
}
self.measurements.push(latency);
}
#[must_use]
pub fn average_latency(&self) -> Option<Duration> {
if self.measurements.is_empty() {
return None;
}
let sum: Duration = self.measurements.iter().sum();
#[allow(clippy::cast_possible_truncation)]
let count = self.measurements.len() as u32;
Some(sum / count)
}
#[must_use]
pub fn is_within_target(&self) -> bool {
self.average_latency()
.map_or(true, |avg| avg <= self.target_latency)
}
#[must_use]
pub const fn target_latency(&self) -> Duration {
self.target_latency
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CmafChunkMode {
Standard,
Chunked,
LowLatencyChunked,
}
impl Default for CmafChunkMode {
fn default() -> Self {
Self::Standard
}
}
#[derive(Debug, Clone)]
pub struct CmafChunkedConfig {
pub mode: CmafChunkMode,
pub chunk_duration_ms: u32,
pub max_samples_per_chunk: u32,
pub include_mfra: bool,
pub signal_low_latency: bool,
pub part_target_duration_ms: Option<u32>,
}
impl Default for CmafChunkedConfig {
fn default() -> Self {
Self {
mode: CmafChunkMode::Standard,
chunk_duration_ms: 500,
max_samples_per_chunk: 5,
include_mfra: false,
signal_low_latency: false,
part_target_duration_ms: None,
}
}
}
impl CmafChunkedConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn with_mode(mut self, mode: CmafChunkMode) -> Self {
self.mode = mode;
self
}
#[must_use]
pub const fn with_chunk_duration_ms(mut self, ms: u32) -> Self {
self.chunk_duration_ms = ms;
self
}
#[must_use]
pub const fn with_max_samples_per_chunk(mut self, max: u32) -> Self {
self.max_samples_per_chunk = max;
self
}
#[must_use]
pub const fn with_mfra(mut self, include: bool) -> Self {
self.include_mfra = include;
self
}
#[must_use]
pub const fn with_low_latency_signal(mut self, signal: bool) -> Self {
self.signal_low_latency = signal;
self
}
#[must_use]
pub const fn with_part_target_duration_ms(mut self, ms: u32) -> Self {
self.part_target_duration_ms = Some(ms);
self
}
}
#[derive(Debug, Clone)]
pub struct CmafChunk {
pub sequence_number: u32,
pub segment_sequence: u32,
pub chunk_index: u32,
pub start_pts: u64,
pub duration: u64,
pub sample_count: u32,
pub data_size: usize,
pub starts_with_keyframe: bool,
pub is_last_in_segment: bool,
pub data: Vec<u8>,
pub is_independent: bool,
}
#[derive(Debug, Clone)]
pub struct ChunkSample {
pub pts: u64,
pub dts: u64,
pub duration: u32,
pub data: Vec<u8>,
pub is_keyframe: bool,
pub track_id: u32,
}
#[derive(Debug)]
pub struct CmafChunkedEncoder {
config: CmafChunkedConfig,
timescale: u32,
pending_samples: Vec<ChunkSample>,
pending_duration: u64,
completed_chunks: Vec<CmafChunk>,
chunk_sequence: u32,
segment_sequence: u32,
chunk_index_in_segment: u32,
total_bytes_produced: u64,
total_chunks_produced: u64,
}
impl CmafChunkedEncoder {
#[must_use]
pub fn new(config: CmafChunkedConfig, timescale: u32) -> Self {
Self {
config,
timescale,
pending_samples: Vec::new(),
pending_duration: 0,
completed_chunks: Vec::new(),
chunk_sequence: 1,
segment_sequence: 1,
chunk_index_in_segment: 0,
total_bytes_produced: 0,
total_chunks_produced: 0,
}
}
#[must_use]
pub const fn timescale(&self) -> u32 {
self.timescale
}
#[must_use]
pub const fn total_chunks_produced(&self) -> u64 {
self.total_chunks_produced
}
#[must_use]
pub const fn total_bytes_produced(&self) -> u64 {
self.total_bytes_produced
}
#[must_use]
pub const fn current_segment_sequence(&self) -> u32 {
self.segment_sequence
}
#[must_use]
pub fn pending_sample_count(&self) -> usize {
self.pending_samples.len()
}
#[must_use]
pub fn available_chunks(&self) -> usize {
self.completed_chunks.len()
}
pub fn push_sample(&mut self, sample: ChunkSample) {
let duration = u64::from(sample.duration);
self.pending_samples.push(sample);
self.pending_duration += duration;
match self.config.mode {
CmafChunkMode::Standard => {
}
CmafChunkMode::Chunked => {
self.try_emit_chunk_by_duration();
}
CmafChunkMode::LowLatencyChunked => {
self.emit_current_chunk(false);
}
}
}
fn try_emit_chunk_by_duration(&mut self) {
let target_ticks = if self.timescale == 0 {
0
} else {
u64::from(self.config.chunk_duration_ms) * u64::from(self.timescale) / 1000
};
if self.pending_duration >= target_ticks
|| self.pending_samples.len() >= self.config.max_samples_per_chunk as usize
{
self.emit_current_chunk(false);
}
}
fn emit_current_chunk(&mut self, is_last: bool) {
if self.pending_samples.is_empty() {
return;
}
let start_pts = self.pending_samples.first().map_or(0, |s| s.dts);
let starts_with_keyframe = self
.pending_samples
.first()
.map_or(false, |s| s.is_keyframe);
let sample_count = self.pending_samples.len() as u32;
let duration = self.pending_duration;
let chunk_data = self.build_chunk_boxes();
let data_size = chunk_data.len();
let chunk = CmafChunk {
sequence_number: self.chunk_sequence,
segment_sequence: self.segment_sequence,
chunk_index: self.chunk_index_in_segment,
start_pts,
duration,
sample_count,
data_size,
starts_with_keyframe,
is_last_in_segment: is_last,
data: chunk_data,
is_independent: starts_with_keyframe,
};
self.completed_chunks.push(chunk);
self.chunk_sequence += 1;
self.chunk_index_in_segment += 1;
self.total_chunks_produced += 1;
self.total_bytes_produced += data_size as u64;
self.pending_samples.clear();
self.pending_duration = 0;
}
fn build_chunk_boxes(&self) -> Vec<u8> {
use crate::mux::cmaf::{write_box, write_full_box, write_u32_be, write_u64_be};
if self.pending_samples.is_empty() {
return Vec::new();
}
let mut mdat_payload = Vec::new();
for sample in &self.pending_samples {
mdat_payload.extend_from_slice(&sample.data);
}
let trun_flags: u32 = 0x000301;
let mut trun_content = Vec::new();
trun_content.extend_from_slice(&write_u32_be(self.pending_samples.len() as u32));
trun_content.extend_from_slice(&write_u32_be(0));
for sample in &self.pending_samples {
trun_content.extend_from_slice(&write_u32_be(sample.duration));
trun_content.extend_from_slice(&write_u32_be(sample.data.len() as u32));
}
let trun = write_full_box(b"trun", 0, trun_flags, &trun_content);
let track_id = self.pending_samples.first().map_or(1, |s| s.track_id);
let mut tfhd_content = Vec::new();
tfhd_content.extend_from_slice(&write_u32_be(track_id));
let tfhd = write_full_box(b"tfhd", 0, 0x020000, &tfhd_content);
let base_dts = self.pending_samples.first().map_or(0, |s| s.dts);
let mut tfdt_content = Vec::new();
tfdt_content.extend_from_slice(&write_u64_be(base_dts));
let tfdt = write_full_box(b"tfdt", 1, 0, &tfdt_content);
let mut traf_content = Vec::new();
traf_content.extend(tfhd);
traf_content.extend(tfdt);
traf_content.extend(trun);
let traf = write_box(b"traf", &traf_content);
let mut mfhd_content = Vec::new();
mfhd_content.extend_from_slice(&write_u32_be(self.chunk_sequence));
let mfhd = write_full_box(b"mfhd", 0, 0, &mfhd_content);
let mut moof_content = Vec::new();
moof_content.extend(mfhd);
moof_content.extend(traf);
let moof = write_box(b"moof", &moof_content);
let mdat = write_box(b"mdat", &mdat_payload);
let mut output = Vec::with_capacity(moof.len() + mdat.len());
output.extend(moof);
output.extend(mdat);
output
}
pub fn flush_segment(&mut self) -> Vec<CmafChunk> {
if !self.pending_samples.is_empty() {
self.emit_current_chunk(true);
} else if let Some(last) = self.completed_chunks.last_mut() {
last.is_last_in_segment = true;
}
let chunks = std::mem::take(&mut self.completed_chunks);
self.segment_sequence += 1;
self.chunk_index_in_segment = 0;
chunks
}
pub fn pop_chunk(&mut self) -> Option<CmafChunk> {
if self.completed_chunks.is_empty() {
None
} else {
Some(self.completed_chunks.remove(0))
}
}
pub fn drain_chunks(&mut self) -> Vec<CmafChunk> {
std::mem::take(&mut self.completed_chunks)
}
}
#[derive(Debug, Clone)]
pub struct CmafSample {
pub pts: i64,
pub duration: u32,
pub is_keyframe: bool,
pub data: Vec<u8>,
}
impl CmafSample {
#[must_use]
pub fn new(pts: i64, duration: u32, is_keyframe: bool, data: Vec<u8>) -> Self {
Self {
pts,
duration,
is_keyframe,
data,
}
}
}
#[derive(Debug, Clone)]
pub struct CmafChunkOwned {
pub data: Vec<u8>,
pub chunk_idx: u64,
pub pts_start: i64,
pub pts_end: i64,
pub is_keyframe: bool,
}
#[derive(Debug, Clone)]
pub struct CmafSegment {
pub chunks: Vec<CmafChunkOwned>,
pub segment_idx: u64,
}
#[derive(Debug)]
pub struct CmafChunkWriter {
pub chunk_duration_ms: u32,
current_chunk: Vec<CmafChunkOwned>,
chunk_count: u64,
segment_count: u64,
}
impl CmafChunkWriter {
#[must_use]
pub fn new(chunk_duration_ms: u32) -> Self {
Self {
chunk_duration_ms,
current_chunk: Vec::new(),
chunk_count: 0,
segment_count: 1,
}
}
pub fn write_sample(&mut self, sample: &CmafSample) -> Option<CmafChunkOwned> {
let chunk = self.build_chunk(sample);
self.current_chunk.push(chunk.clone());
Some(chunk)
}
pub fn flush_chunk(&mut self) -> Option<CmafChunkOwned> {
None
}
pub fn flush_segment(&mut self) -> Option<CmafSegment> {
if self.current_chunk.is_empty() {
return None;
}
let chunks = std::mem::take(&mut self.current_chunk);
let seg = CmafSegment {
chunks,
segment_idx: self.segment_count,
};
self.segment_count += 1;
Some(seg)
}
#[must_use]
pub fn chunk_count(&self) -> u64 {
self.chunk_count
}
#[must_use]
pub fn segment_count(&self) -> u64 {
self.segment_count
}
fn build_chunk(&mut self, sample: &CmafSample) -> CmafChunkOwned {
use crate::mux::cmaf::{write_box, write_full_box, write_u32_be, write_u64_be};
let chunk_idx = self.chunk_count;
self.chunk_count += 1;
let sequence_number = (chunk_idx as u32).wrapping_add(1);
let track_id: u32 = 1;
let mdat = write_box(b"mdat", &sample.data);
let trun_flags: u32 = 0x0000_0301;
let mut trun_content = Vec::new();
trun_content.extend_from_slice(&write_u32_be(1_u32)); trun_content.extend_from_slice(&write_u32_be(0_u32)); trun_content.extend_from_slice(&write_u32_be(sample.duration));
trun_content.extend_from_slice(&write_u32_be(sample.data.len() as u32));
let trun = write_full_box(b"trun", 0, trun_flags, &trun_content);
let mut tfhd_content = Vec::new();
tfhd_content.extend_from_slice(&write_u32_be(track_id));
let tfhd = write_full_box(b"tfhd", 0, 0x0002_0000, &tfhd_content);
let dts: u64 = if sample.pts >= 0 {
sample.pts as u64
} else {
0
};
let mut tfdt_content = Vec::new();
tfdt_content.extend_from_slice(&write_u64_be(dts));
let tfdt = write_full_box(b"tfdt", 1, 0, &tfdt_content);
let mut traf_content = Vec::new();
traf_content.extend_from_slice(&tfhd);
traf_content.extend_from_slice(&tfdt);
traf_content.extend_from_slice(&trun);
let traf = write_box(b"traf", &traf_content);
let mut mfhd_content = Vec::new();
mfhd_content.extend_from_slice(&write_u32_be(sequence_number));
let mfhd = write_full_box(b"mfhd", 0, 0, &mfhd_content);
let mut moof_content = Vec::new();
moof_content.extend_from_slice(&mfhd);
moof_content.extend_from_slice(&traf);
let moof = write_box(b"moof", &moof_content);
let mut data = Vec::with_capacity(moof.len() + mdat.len());
data.extend_from_slice(&moof);
data.extend_from_slice(&mdat);
let pts_end = sample.pts.saturating_add(i64::from(sample.duration));
CmafChunkOwned {
data,
chunk_idx,
pts_start: sample.pts,
pts_end,
is_keyframe: sample.is_keyframe,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config_default() {
let config = StreamingMuxerConfig::default();
assert_eq!(config.target_latency_ms, 1000);
assert!(!config.low_latency);
assert!(config.fragment_duration_ms.is_none());
assert!(!config.realtime);
}
#[test]
fn test_streaming_config_builder() {
let config = StreamingMuxerConfig::new()
.with_low_latency(true)
.with_target_latency(500)
.with_fragment_duration(2000)
.with_realtime(true);
assert!(config.low_latency);
assert_eq!(config.target_latency_ms, 500);
assert_eq!(config.fragment_duration_ms, Some(2000));
assert!(config.realtime);
}
#[test]
fn test_muxing_stats() {
let mut stats = MuxingStats::new();
assert_eq!(stats.packets_written, 0);
assert_eq!(stats.bytes_written, 0);
stats.update(1000, 1.0);
assert_eq!(stats.packets_written, 1);
assert_eq!(stats.bytes_written, 1000);
assert!(stats.avg_bitrate > 0.0);
stats.update(2000, 2.0);
assert_eq!(stats.packets_written, 2);
assert_eq!(stats.bytes_written, 3000);
}
#[test]
fn test_latency_monitor() {
let mut monitor = LatencyMonitor::new(Duration::from_millis(100));
monitor.record(Duration::from_millis(50));
monitor.record(Duration::from_millis(60));
monitor.record(Duration::from_millis(70));
let avg = monitor.average_latency().expect("operation should succeed");
assert!(avg >= Duration::from_millis(59) && avg <= Duration::from_millis(61));
assert!(monitor.is_within_target());
}
#[test]
fn test_cmaf_chunked_config_default() {
let config = CmafChunkedConfig::new();
assert_eq!(config.mode, CmafChunkMode::Standard);
assert_eq!(config.chunk_duration_ms, 500);
assert_eq!(config.max_samples_per_chunk, 5);
assert!(!config.include_mfra);
assert!(!config.signal_low_latency);
assert!(config.part_target_duration_ms.is_none());
}
#[test]
fn test_cmaf_chunked_config_builder() {
let config = CmafChunkedConfig::new()
.with_mode(CmafChunkMode::LowLatencyChunked)
.with_chunk_duration_ms(200)
.with_max_samples_per_chunk(1)
.with_mfra(true)
.with_low_latency_signal(true)
.with_part_target_duration_ms(333);
assert_eq!(config.mode, CmafChunkMode::LowLatencyChunked);
assert_eq!(config.chunk_duration_ms, 200);
assert_eq!(config.max_samples_per_chunk, 1);
assert!(config.include_mfra);
assert!(config.signal_low_latency);
assert_eq!(config.part_target_duration_ms, Some(333));
}
fn make_sample(pts: u64, duration: u32, keyframe: bool) -> ChunkSample {
ChunkSample {
pts,
dts: pts,
duration,
data: vec![0xAA; 100],
is_keyframe: keyframe,
track_id: 1,
}
}
#[test]
fn test_encoder_new() {
let encoder = CmafChunkedEncoder::new(CmafChunkedConfig::new(), 90000);
assert_eq!(encoder.timescale(), 90000);
assert_eq!(encoder.total_chunks_produced(), 0);
assert_eq!(encoder.total_bytes_produced(), 0);
assert_eq!(encoder.pending_sample_count(), 0);
assert_eq!(encoder.available_chunks(), 0);
}
#[test]
fn test_standard_mode_no_auto_chunks() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::Standard);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.push_sample(make_sample(3000, 3000, false));
encoder.push_sample(make_sample(6000, 3000, false));
assert_eq!(encoder.available_chunks(), 0);
assert_eq!(encoder.pending_sample_count(), 3);
}
#[test]
fn test_low_latency_one_sample_per_chunk() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
assert_eq!(encoder.available_chunks(), 1);
encoder.push_sample(make_sample(3000, 3000, false));
assert_eq!(encoder.available_chunks(), 2);
encoder.push_sample(make_sample(6000, 3000, false));
assert_eq!(encoder.available_chunks(), 3);
}
#[test]
fn test_chunked_mode_duration_based() {
let config = CmafChunkedConfig::new()
.with_mode(CmafChunkMode::Chunked)
.with_chunk_duration_ms(100)
.with_max_samples_per_chunk(100);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
assert_eq!(encoder.available_chunks(), 0);
encoder.push_sample(make_sample(3000, 3000, false));
assert_eq!(encoder.available_chunks(), 0);
encoder.push_sample(make_sample(6000, 3000, false));
assert_eq!(encoder.available_chunks(), 1);
}
#[test]
fn test_chunked_mode_max_samples() {
let config = CmafChunkedConfig::new()
.with_mode(CmafChunkMode::Chunked)
.with_chunk_duration_ms(99999) .with_max_samples_per_chunk(2);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
assert_eq!(encoder.available_chunks(), 0);
encoder.push_sample(make_sample(3000, 3000, false));
assert_eq!(encoder.available_chunks(), 1);
}
#[test]
fn test_pop_chunk() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.push_sample(make_sample(3000, 3000, false));
let chunk1 = encoder.pop_chunk().expect("should have chunk");
assert_eq!(chunk1.sequence_number, 1);
assert!(chunk1.starts_with_keyframe);
let chunk2 = encoder.pop_chunk().expect("should have chunk");
assert_eq!(chunk2.sequence_number, 2);
assert!(!chunk2.starts_with_keyframe);
assert!(encoder.pop_chunk().is_none());
}
#[test]
fn test_drain_chunks() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
for i in 0..5 {
encoder.push_sample(make_sample(i * 3000, 3000, i == 0));
}
let chunks = encoder.drain_chunks();
assert_eq!(chunks.len(), 5);
assert_eq!(encoder.available_chunks(), 0);
}
#[test]
fn test_flush_segment() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::Standard);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.push_sample(make_sample(3000, 3000, false));
let chunks = encoder.flush_segment();
assert_eq!(chunks.len(), 1);
assert!(chunks[0].is_last_in_segment);
assert_eq!(chunks[0].sample_count, 2);
assert_eq!(encoder.current_segment_sequence(), 2);
}
#[test]
fn test_flush_segment_with_existing_chunks() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.push_sample(make_sample(3000, 3000, false));
let chunks = encoder.flush_segment();
assert_eq!(chunks.len(), 2);
assert!(chunks.last().map_or(false, |c| c.is_last_in_segment));
}
#[test]
fn test_chunk_contains_moof_mdat() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
let chunk = encoder.pop_chunk().expect("should have chunk");
assert!(!chunk.data.is_empty());
let has_moof = chunk.data.windows(4).any(|w| w == b"moof");
let has_mdat = chunk.data.windows(4).any(|w| w == b"mdat");
assert!(has_moof, "chunk must contain moof box");
assert!(has_mdat, "chunk must contain mdat box");
}
#[test]
fn test_chunk_metadata() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(9000, 3000, true));
let chunk = encoder.pop_chunk().expect("should have chunk");
assert_eq!(chunk.start_pts, 9000);
assert_eq!(chunk.duration, 3000);
assert_eq!(chunk.sample_count, 1);
assert!(chunk.starts_with_keyframe);
assert!(chunk.is_independent);
assert_eq!(chunk.segment_sequence, 1);
assert_eq!(chunk.chunk_index, 0);
}
#[test]
fn test_chunk_indices_increment() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
for i in 0..3 {
encoder.push_sample(make_sample(i * 3000, 3000, i == 0));
}
let chunks = encoder.drain_chunks();
assert_eq!(chunks[0].chunk_index, 0);
assert_eq!(chunks[1].chunk_index, 1);
assert_eq!(chunks[2].chunk_index, 2);
}
#[test]
fn test_segment_sequence_advances() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.flush_segment();
assert_eq!(encoder.current_segment_sequence(), 2);
encoder.push_sample(make_sample(3000, 3000, true));
encoder.flush_segment();
assert_eq!(encoder.current_segment_sequence(), 3);
}
#[test]
fn test_total_bytes_tracked() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
encoder.push_sample(make_sample(0, 3000, true));
encoder.push_sample(make_sample(3000, 3000, false));
assert_eq!(encoder.total_chunks_produced(), 2);
assert!(encoder.total_bytes_produced() > 0);
}
#[test]
fn test_flush_empty_encoder() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::Standard);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
let chunks = encoder.flush_segment();
assert!(chunks.is_empty());
}
#[test]
fn test_multiple_segments() {
let config = CmafChunkedConfig::new().with_mode(CmafChunkMode::LowLatencyChunked);
let mut encoder = CmafChunkedEncoder::new(config, 90000);
for i in 0..3 {
encoder.push_sample(make_sample(i * 3000, 3000, i == 0));
}
let seg1 = encoder.flush_segment();
assert_eq!(seg1.len(), 3);
assert_eq!(seg1[0].segment_sequence, 1);
for i in 0..2 {
encoder.push_sample(make_sample(9000 + i * 3000, 3000, i == 0));
}
let seg2 = encoder.flush_segment();
assert_eq!(seg2.len(), 2);
assert_eq!(seg2[0].segment_sequence, 2);
assert_eq!(seg2[0].chunk_index, 0); }
fn make_cmaf_sample(pts: i64, duration: u32, is_keyframe: bool) -> CmafSample {
CmafSample::new(pts, duration, is_keyframe, vec![0xBB; 256])
}
#[test]
fn test_chunk_writer_basic_moof_mdat() {
let mut writer = CmafChunkWriter::new(500);
let s = make_cmaf_sample(0, 3000, true);
let chunk = writer.write_sample(&s).expect("should return chunk");
assert!(!chunk.data.is_empty());
assert!(
chunk.data.windows(4).any(|w| w == b"moof"),
"chunk must contain moof box"
);
assert!(
chunk.data.windows(4).any(|w| w == b"mdat"),
"chunk must contain mdat box"
);
}
#[test]
fn test_chunk_writer_pts_range() {
let mut writer = CmafChunkWriter::new(500);
let s = make_cmaf_sample(9000, 3000, false);
let chunk = writer.write_sample(&s).expect("should return chunk");
assert_eq!(chunk.pts_start, 9000);
assert_eq!(chunk.pts_end, 12000);
assert!(!chunk.is_keyframe);
}
#[test]
fn test_chunk_writer_keyframe_flag() {
let mut writer = CmafChunkWriter::new(500);
let s = make_cmaf_sample(0, 3000, true);
let chunk = writer.write_sample(&s).expect("should return chunk");
assert!(chunk.is_keyframe);
}
#[test]
fn test_chunk_writer_idx_increments() {
let mut writer = CmafChunkWriter::new(500);
for i in 0..4_i64 {
let s = make_cmaf_sample(i * 3000, 3000, i == 0);
let chunk = writer.write_sample(&s).expect("should return chunk");
assert_eq!(chunk.chunk_idx, i as u64);
}
assert_eq!(writer.chunk_count(), 4);
}
#[test]
fn test_chunk_writer_flush_segment_collects() {
let mut writer = CmafChunkWriter::new(500);
for i in 0..3_i64 {
writer.write_sample(&make_cmaf_sample(i * 3000, 3000, i == 0));
}
let seg = writer.flush_segment().expect("should have segment");
assert_eq!(seg.chunks.len(), 3);
assert_eq!(seg.segment_idx, 1);
assert_eq!(writer.segment_count(), 2);
}
#[test]
fn test_chunk_writer_flush_segment_advances_counter() {
let mut writer = CmafChunkWriter::new(500);
writer.write_sample(&make_cmaf_sample(0, 3000, true));
let seg1 = writer.flush_segment().expect("seg1");
assert_eq!(seg1.segment_idx, 1);
writer.write_sample(&make_cmaf_sample(3000, 3000, true));
let seg2 = writer.flush_segment().expect("seg2");
assert_eq!(seg2.segment_idx, 2);
assert_eq!(writer.segment_count(), 3);
}
#[test]
fn test_chunk_writer_flush_empty_is_none() {
let mut writer = CmafChunkWriter::new(500);
assert!(writer.flush_segment().is_none());
writer.write_sample(&make_cmaf_sample(0, 3000, true));
writer.flush_segment();
assert!(writer.flush_segment().is_none());
}
#[test]
fn test_chunk_writer_flush_chunk_noop() {
let mut writer = CmafChunkWriter::new(500);
assert!(writer.flush_chunk().is_none());
writer.write_sample(&make_cmaf_sample(0, 3000, true));
assert!(writer.flush_chunk().is_none());
}
#[test]
fn test_chunk_writer_negative_pts_clamped() {
let mut writer = CmafChunkWriter::new(500);
let s = make_cmaf_sample(-1000, 3000, false);
let chunk = writer.write_sample(&s).expect("should return chunk");
assert_eq!(chunk.pts_start, -1000);
assert_eq!(chunk.pts_end, 2000);
}
}