#![forbid(unsafe_code)]
use oximedia_core::{OxiError, OxiResult, Rational, Timestamp};
use std::time::Duration;
use crate::{Packet, StreamInfo};
#[derive(Clone, Debug)]
pub struct FragmentedMp4Config {
pub fragment_duration_ms: u64,
pub separate_init_segment: bool,
pub self_initializing: bool,
pub sequence_number: u32,
pub single_fragment: bool,
}
impl Default for FragmentedMp4Config {
fn default() -> Self {
Self {
fragment_duration_ms: 2000, separate_init_segment: true,
self_initializing: false,
sequence_number: 1,
single_fragment: true,
}
}
}
impl FragmentedMp4Config {
#[must_use]
pub const fn new() -> Self {
Self {
fragment_duration_ms: 2000,
separate_init_segment: true,
self_initializing: false,
sequence_number: 1,
single_fragment: true,
}
}
#[must_use]
pub const fn with_fragment_duration(mut self, duration_ms: u64) -> Self {
self.fragment_duration_ms = duration_ms;
self
}
#[must_use]
pub const fn with_separate_init(mut self, enabled: bool) -> Self {
self.separate_init_segment = enabled;
self
}
#[must_use]
pub const fn with_self_initializing(mut self, enabled: bool) -> Self {
self.self_initializing = enabled;
self
}
#[must_use]
pub const fn with_sequence_number(mut self, number: u32) -> Self {
self.sequence_number = number;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FragmentType {
Init,
Media,
}
#[derive(Debug, Clone)]
pub struct Mp4Fragment {
pub fragment_type: FragmentType,
pub sequence: u32,
pub data: Vec<u8>,
pub duration_us: u64,
pub start_timestamp: Timestamp,
pub stream_indices: Vec<usize>,
pub has_keyframe: bool,
}
impl Mp4Fragment {
#[must_use]
pub fn new(fragment_type: FragmentType, sequence: u32) -> Self {
Self {
fragment_type,
sequence,
data: Vec::new(),
duration_us: 0,
start_timestamp: Timestamp::new(0, Rational::new(1, 1)),
stream_indices: Vec::new(),
has_keyframe: false,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.data.len()
}
#[must_use]
pub const fn duration(&self) -> Duration {
Duration::from_micros(self.duration_us)
}
#[must_use]
pub const fn is_init(&self) -> bool {
matches!(self.fragment_type, FragmentType::Init)
}
#[must_use]
pub const fn is_media(&self) -> bool {
matches!(self.fragment_type, FragmentType::Media)
}
}
#[derive(Debug)]
pub struct FragmentedMp4Builder {
config: FragmentedMp4Config,
streams: Vec<StreamInfo>,
#[allow(dead_code)]
current_fragment: Option<Mp4Fragment>,
fragment_start_time: Option<i64>,
packets_in_fragment: Vec<Packet>,
}
impl FragmentedMp4Builder {
#[must_use]
pub fn new(config: FragmentedMp4Config) -> Self {
Self {
config,
streams: Vec::new(),
current_fragment: None,
fragment_start_time: None,
packets_in_fragment: Vec::new(),
}
}
pub fn add_stream(&mut self, info: StreamInfo) -> usize {
self.streams.push(info);
self.streams.len() - 1
}
#[must_use]
pub fn streams(&self) -> &[StreamInfo] {
&self.streams
}
pub fn build_init_segment(&self) -> OxiResult<Mp4Fragment> {
if self.streams.is_empty() {
return Err(OxiError::InvalidData("No streams added".into()));
}
let mut fragment = Mp4Fragment::new(FragmentType::Init, 0);
fragment.data = b"ftyp".to_vec();
Ok(fragment)
}
pub fn add_packet(&mut self, packet: Packet) -> OxiResult<Option<Mp4Fragment>> {
if self.fragment_start_time.is_none() {
self.fragment_start_time = Some(packet.pts());
}
self.packets_in_fragment.push(packet);
if self.should_close_fragment() {
self.finalize_fragment()
} else {
Ok(None)
}
}
fn should_close_fragment(&self) -> bool {
if self.packets_in_fragment.is_empty() {
return false;
}
if let Some(last_packet) = self.packets_in_fragment.last() {
if let Some(start_time) = self.fragment_start_time {
let duration_ms = (last_packet.pts() - start_time) / 1000;
#[allow(clippy::cast_sign_loss)]
{
if duration_ms as u64 >= self.config.fragment_duration_ms
&& last_packet.is_keyframe()
{
return true;
}
}
}
}
false
}
fn finalize_fragment(&mut self) -> OxiResult<Option<Mp4Fragment>> {
if self.packets_in_fragment.is_empty() {
return Ok(None);
}
let sequence = self.config.sequence_number;
let mut fragment = Mp4Fragment::new(FragmentType::Media, sequence);
let start_pts = self
.fragment_start_time
.ok_or_else(|| OxiError::InvalidData("No start time".into()))?;
let end_pts = self
.packets_in_fragment
.last()
.map_or(start_pts, super::super::packet::Packet::pts);
#[allow(clippy::cast_sign_loss)]
{
fragment.duration_us = ((end_pts - start_pts) * 1000) as u64;
}
if let Some(first_packet) = self.packets_in_fragment.first() {
fragment.start_timestamp = first_packet.timestamp;
}
let mut stream_indices: Vec<usize> = self
.packets_in_fragment
.iter()
.map(|p| p.stream_index)
.collect();
stream_indices.sort_unstable();
stream_indices.dedup();
fragment.stream_indices = stream_indices;
fragment.has_keyframe = self
.packets_in_fragment
.iter()
.any(super::super::packet::Packet::is_keyframe);
fragment.data = b"moof".to_vec();
self.packets_in_fragment.clear();
self.fragment_start_time = None;
self.config.sequence_number += 1;
Ok(Some(fragment))
}
pub fn flush(&mut self) -> OxiResult<Option<Mp4Fragment>> {
self.finalize_fragment()
}
}
#[derive(Debug, Clone)]
pub struct FragmentedTrack {
pub track_id: u32,
pub stream_index: usize,
pub stream_info: StreamInfo,
pub default_sample_duration: Option<u32>,
pub default_sample_size: Option<u32>,
}
impl FragmentedTrack {
#[must_use]
pub const fn new(track_id: u32, stream_index: usize, stream_info: StreamInfo) -> Self {
Self {
track_id,
stream_index,
stream_info,
default_sample_duration: None,
default_sample_size: None,
}
}
#[must_use]
pub const fn with_default_duration(mut self, duration: u32) -> Self {
self.default_sample_duration = Some(duration);
self
}
#[must_use]
pub const fn with_default_size(mut self, size: u32) -> Self {
self.default_sample_size = Some(size);
self
}
}
#[derive(Debug)]
pub struct FragmentedMp4Ingest {
expected_sequence: u32,
fragments_received: u64,
bytes_received: u64,
out_of_order_count: u64,
init_received: bool,
}
impl FragmentedMp4Ingest {
#[must_use]
pub fn new() -> Self {
Self {
expected_sequence: 1,
fragments_received: 0,
bytes_received: 0,
out_of_order_count: 0,
init_received: false,
}
}
pub fn ingest(&mut self, data: &[u8]) -> OxiResult<IngestResult> {
if data.len() < 8 {
return Err(OxiError::InvalidData("Fragment data too short".into()));
}
let box_type = &data[4..8];
if box_type == b"ftyp" {
self.init_received = true;
self.bytes_received += data.len() as u64;
return Ok(IngestResult::InitSegment);
}
if box_type == b"moof" {
if !self.init_received {
return Err(OxiError::InvalidData(
"Received moof before init segment".into(),
));
}
let sequence = self
.parse_mfhd_sequence(data)
.unwrap_or(self.expected_sequence);
if sequence != self.expected_sequence {
self.out_of_order_count += 1;
}
self.expected_sequence = sequence + 1;
self.fragments_received += 1;
self.bytes_received += data.len() as u64;
return Ok(IngestResult::MediaFragment { sequence });
}
self.bytes_received += data.len() as u64;
Ok(IngestResult::OtherBox)
}
fn parse_mfhd_sequence(&self, data: &[u8]) -> Option<u32> {
if data.len() < 24 {
return None;
}
if &data[12..16] != b"mfhd" {
return None;
}
let seq = u32::from_be_bytes([data[20], data[21], data[22], data[23]]);
Some(seq)
}
#[must_use]
pub fn fragments_received(&self) -> u64 {
self.fragments_received
}
#[must_use]
pub fn bytes_received(&self) -> u64 {
self.bytes_received
}
#[must_use]
pub fn out_of_order_count(&self) -> u64 {
self.out_of_order_count
}
#[must_use]
pub fn init_received(&self) -> bool {
self.init_received
}
}
impl Default for FragmentedMp4Ingest {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IngestResult {
InitSegment,
MediaFragment {
sequence: u32,
},
OtherBox,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CmafChunkType {
Regular,
LowLatency,
}
#[derive(Debug, Clone)]
pub struct CmafConfig {
pub chunk_duration_ms: u64,
pub chunk_type: CmafChunkType,
pub add_styp: bool,
pub brand: String,
pub chunked_transfer: bool,
}
impl Default for CmafConfig {
fn default() -> Self {
Self {
chunk_duration_ms: 2000,
chunk_type: CmafChunkType::Regular,
add_styp: true,
brand: "cmfc".into(),
chunked_transfer: false,
}
}
}
impl CmafConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn low_latency() -> Self {
Self {
chunk_duration_ms: 500,
chunk_type: CmafChunkType::LowLatency,
add_styp: true,
brand: "cmfl".into(),
chunked_transfer: true,
}
}
#[must_use]
pub fn with_chunk_duration_ms(mut self, ms: u64) -> Self {
self.chunk_duration_ms = ms;
self
}
#[must_use]
pub fn with_chunk_type(mut self, ct: CmafChunkType) -> Self {
self.chunk_type = ct;
self
}
#[must_use]
pub fn with_chunked_transfer(mut self, enabled: bool) -> Self {
self.chunked_transfer = enabled;
self
}
}
#[derive(Debug, Clone)]
pub struct CmafChunk {
pub sequence: u32,
pub data: Vec<u8>,
pub duration_us: u64,
pub starts_with_keyframe: bool,
pub chunk_type: CmafChunkType,
pub independent: bool,
}
impl CmafChunk {
#[must_use]
pub fn new(sequence: u32, chunk_type: CmafChunkType) -> Self {
Self {
sequence,
data: Vec::new(),
duration_us: 0,
starts_with_keyframe: false,
chunk_type,
independent: false,
}
}
#[must_use]
pub fn size(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn duration(&self) -> Duration {
Duration::from_micros(self.duration_us)
}
}
#[derive(Debug)]
pub struct CmafChunkBuilder {
config: CmafConfig,
sequence: u32,
}
impl CmafChunkBuilder {
#[must_use]
pub fn new(config: CmafConfig) -> Self {
Self {
config,
sequence: 1,
}
}
pub fn fragment_to_chunks(&mut self, fragment: &Mp4Fragment) -> Vec<CmafChunk> {
if fragment.is_init() {
return Vec::new();
}
let mut chunk = CmafChunk::new(self.sequence, self.config.chunk_type);
chunk.duration_us = fragment.duration_us;
chunk.starts_with_keyframe = fragment.has_keyframe;
chunk.independent = fragment.has_keyframe;
let mut data = Vec::new();
if self.config.add_styp {
let brand_bytes = self.config.brand.as_bytes();
let brand = if brand_bytes.len() >= 4 {
[
brand_bytes[0],
brand_bytes[1],
brand_bytes[2],
brand_bytes[3],
]
} else {
[b'c', b'm', b'f', b'c']
};
let styp_size: u32 = 16; data.extend_from_slice(&styp_size.to_be_bytes());
data.extend_from_slice(b"styp");
data.extend_from_slice(&brand);
data.extend_from_slice(&0u32.to_be_bytes()); }
data.extend_from_slice(&fragment.data);
chunk.data = data;
self.sequence += 1;
vec![chunk]
}
#[must_use]
pub fn current_sequence(&self) -> u32 {
self.sequence
}
#[must_use]
pub fn config(&self) -> &CmafConfig {
&self.config
}
}
#[derive(Debug, Clone)]
pub struct FragmentBoundaryDetector {
boundaries: Vec<FragmentBoundary>,
}
#[derive(Debug, Clone)]
pub struct FragmentBoundary {
pub moof_offset: u64,
pub moof_size: u32,
pub mdat_offset: Option<u64>,
pub mdat_size: Option<u32>,
pub valid: bool,
}
impl FragmentBoundaryDetector {
#[must_use]
pub fn new() -> Self {
Self {
boundaries: Vec::new(),
}
}
pub fn scan(&mut self, data: &[u8]) {
self.boundaries.clear();
let mut offset = 0u64;
while (offset as usize) + 8 <= data.len() {
let pos = offset as usize;
let size = u32::from_be_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]);
let box_type = &data[pos + 4..pos + 8];
if box_type == b"moof" && size >= 8 {
let mut boundary = FragmentBoundary {
moof_offset: offset,
moof_size: size,
mdat_offset: None,
mdat_size: None,
valid: true,
};
let mdat_pos = (offset + u64::from(size)) as usize;
if mdat_pos + 8 <= data.len() {
let mdat_size = u32::from_be_bytes([
data[mdat_pos],
data[mdat_pos + 1],
data[mdat_pos + 2],
data[mdat_pos + 3],
]);
if &data[mdat_pos + 4..mdat_pos + 8] == b"mdat" {
boundary.mdat_offset = Some(offset + u64::from(size));
boundary.mdat_size = Some(mdat_size);
}
}
self.boundaries.push(boundary);
}
if size < 8 {
break;
}
offset += u64::from(size);
}
}
#[must_use]
pub fn boundaries(&self) -> &[FragmentBoundary] {
&self.boundaries
}
#[must_use]
pub fn fragment_count(&self) -> usize {
self.boundaries.len()
}
#[must_use]
pub fn all_valid(&self) -> bool {
self.boundaries
.iter()
.all(|b| b.valid && b.mdat_offset.is_some())
}
}
impl Default for FragmentBoundaryDetector {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use oximedia_core::Rational;
#[test]
fn test_config_default() {
let config = FragmentedMp4Config::default();
assert_eq!(config.fragment_duration_ms, 2000);
assert!(config.separate_init_segment);
assert!(!config.self_initializing);
assert_eq!(config.sequence_number, 1);
}
#[test]
fn test_config_builder() {
let config = FragmentedMp4Config::new()
.with_fragment_duration(3000)
.with_separate_init(false)
.with_self_initializing(true)
.with_sequence_number(10);
assert_eq!(config.fragment_duration_ms, 3000);
assert!(!config.separate_init_segment);
assert!(config.self_initializing);
assert_eq!(config.sequence_number, 10);
}
#[test]
fn test_fragment_creation() {
let fragment = Mp4Fragment::new(FragmentType::Init, 0);
assert!(fragment.is_init());
assert!(!fragment.is_media());
assert_eq!(fragment.sequence, 0);
assert_eq!(fragment.size(), 0);
}
#[test]
fn test_builder() {
let config = FragmentedMp4Config::default();
let mut builder = FragmentedMp4Builder::new(config);
let mut stream_info =
StreamInfo::new(0, oximedia_core::CodecId::Opus, Rational::new(1, 48000));
stream_info.codec_params = crate::stream::CodecParams::audio(48000, 2);
let index = builder.add_stream(stream_info);
assert_eq!(index, 0);
assert_eq!(builder.streams().len(), 1);
}
#[test]
fn test_fragmented_track() {
let mut stream_info =
StreamInfo::new(0, oximedia_core::CodecId::Opus, Rational::new(1, 48000));
stream_info.codec_params = crate::stream::CodecParams::audio(48000, 2);
let track = FragmentedTrack::new(1, 0, stream_info)
.with_default_duration(960)
.with_default_size(100);
assert_eq!(track.track_id, 1);
assert_eq!(track.stream_index, 0);
assert_eq!(track.default_sample_duration, Some(960));
assert_eq!(track.default_sample_size, Some(100));
}
#[test]
fn test_ingest_new() {
let ingest = FragmentedMp4Ingest::new();
assert!(!ingest.init_received());
assert_eq!(ingest.fragments_received(), 0);
assert_eq!(ingest.bytes_received(), 0);
}
#[test]
fn test_ingest_too_short() {
let mut ingest = FragmentedMp4Ingest::new();
let result = ingest.ingest(&[0u8; 4]);
assert!(result.is_err());
}
#[test]
fn test_ingest_ftyp() {
let mut data = vec![0u8; 20];
data[0..4].copy_from_slice(&20u32.to_be_bytes());
data[4..8].copy_from_slice(b"ftyp");
data[8..12].copy_from_slice(b"iso5");
let mut ingest = FragmentedMp4Ingest::new();
let result = ingest.ingest(&data).expect("ingest ok");
assert_eq!(result, IngestResult::InitSegment);
assert!(ingest.init_received());
}
#[test]
fn test_ingest_moof_before_init() {
let mut data = vec![0u8; 24];
data[0..4].copy_from_slice(&24u32.to_be_bytes());
data[4..8].copy_from_slice(b"moof");
let mut ingest = FragmentedMp4Ingest::new();
let result = ingest.ingest(&data);
assert!(result.is_err()); }
#[test]
fn test_ingest_moof_after_init() {
let mut ingest = FragmentedMp4Ingest::new();
let mut ftyp = vec![0u8; 20];
ftyp[0..4].copy_from_slice(&20u32.to_be_bytes());
ftyp[4..8].copy_from_slice(b"ftyp");
ingest.ingest(&ftyp).expect("ftyp ok");
let mut moof = vec![0u8; 24];
moof[0..4].copy_from_slice(&24u32.to_be_bytes());
moof[4..8].copy_from_slice(b"moof");
moof[8..12].copy_from_slice(&16u32.to_be_bytes()); moof[12..16].copy_from_slice(b"mfhd");
moof[16..20].copy_from_slice(&0u32.to_be_bytes()); moof[20..24].copy_from_slice(&1u32.to_be_bytes());
let result = ingest.ingest(&moof).expect("moof ok");
assert_eq!(result, IngestResult::MediaFragment { sequence: 1 });
assert_eq!(ingest.fragments_received(), 1);
}
#[test]
fn test_ingest_other_box() {
let mut ingest = FragmentedMp4Ingest::new();
let mut data = vec![0u8; 16];
data[0..4].copy_from_slice(&16u32.to_be_bytes());
data[4..8].copy_from_slice(b"mdat");
let result = ingest.ingest(&data).expect("ingest ok");
assert_eq!(result, IngestResult::OtherBox);
}
#[test]
fn test_cmaf_config_default() {
let cfg = CmafConfig::default();
assert_eq!(cfg.chunk_duration_ms, 2000);
assert_eq!(cfg.chunk_type, CmafChunkType::Regular);
assert!(cfg.add_styp);
assert!(!cfg.chunked_transfer);
}
#[test]
fn test_cmaf_config_low_latency() {
let cfg = CmafConfig::low_latency();
assert_eq!(cfg.chunk_duration_ms, 500);
assert_eq!(cfg.chunk_type, CmafChunkType::LowLatency);
assert!(cfg.chunked_transfer);
}
#[test]
fn test_cmaf_config_builder() {
let cfg = CmafConfig::new()
.with_chunk_duration_ms(1000)
.with_chunk_type(CmafChunkType::LowLatency)
.with_chunked_transfer(true);
assert_eq!(cfg.chunk_duration_ms, 1000);
assert_eq!(cfg.chunk_type, CmafChunkType::LowLatency);
assert!(cfg.chunked_transfer);
}
#[test]
fn test_cmaf_chunk_new() {
let chunk = CmafChunk::new(1, CmafChunkType::Regular);
assert_eq!(chunk.sequence, 1);
assert_eq!(chunk.size(), 0);
assert!(!chunk.starts_with_keyframe);
}
#[test]
fn test_cmaf_chunk_builder_from_fragment() {
let mut fragment = Mp4Fragment::new(FragmentType::Media, 1);
fragment.data = b"moof_mdat_data".to_vec();
fragment.duration_us = 2_000_000;
fragment.has_keyframe = true;
let cfg = CmafConfig::new();
let mut builder = CmafChunkBuilder::new(cfg);
let chunks = builder.fragment_to_chunks(&fragment);
assert_eq!(chunks.len(), 1);
assert!(chunks[0].starts_with_keyframe);
assert_eq!(chunks[0].duration_us, 2_000_000);
assert!(chunks[0].data.len() > fragment.data.len());
assert_eq!(builder.current_sequence(), 2);
}
#[test]
fn test_cmaf_chunk_builder_skips_init() {
let fragment = Mp4Fragment::new(FragmentType::Init, 0);
let cfg = CmafConfig::new();
let mut builder = CmafChunkBuilder::new(cfg);
let chunks = builder.fragment_to_chunks(&fragment);
assert!(chunks.is_empty());
}
#[test]
fn test_boundary_detector_empty() {
let mut detector = FragmentBoundaryDetector::new();
detector.scan(&[]);
assert_eq!(detector.fragment_count(), 0);
assert!(detector.all_valid());
}
#[test]
fn test_boundary_detector_single_moof_mdat() {
let mut data = Vec::new();
data.extend_from_slice(&16u32.to_be_bytes());
data.extend_from_slice(b"moof");
data.extend_from_slice(&[0u8; 8]); data.extend_from_slice(&12u32.to_be_bytes());
data.extend_from_slice(b"mdat");
data.extend_from_slice(&[0u8; 4]);
let mut detector = FragmentBoundaryDetector::new();
detector.scan(&data);
assert_eq!(detector.fragment_count(), 1);
let b = &detector.boundaries()[0];
assert_eq!(b.moof_offset, 0);
assert_eq!(b.moof_size, 16);
assert!(b.mdat_offset.is_some());
assert_eq!(b.mdat_size, Some(12));
assert!(detector.all_valid());
}
#[test]
fn test_boundary_detector_moof_without_mdat() {
let mut data = Vec::new();
data.extend_from_slice(&16u32.to_be_bytes());
data.extend_from_slice(b"moof");
data.extend_from_slice(&[0u8; 8]);
let mut detector = FragmentBoundaryDetector::new();
detector.scan(&data);
assert_eq!(detector.fragment_count(), 1);
assert!(detector.boundaries()[0].mdat_offset.is_none());
assert!(!detector.all_valid()); }
#[test]
fn test_boundary_detector_two_fragments() {
let mut data = Vec::new();
for _ in 0..2 {
data.extend_from_slice(&16u32.to_be_bytes());
data.extend_from_slice(b"moof");
data.extend_from_slice(&[0u8; 8]);
data.extend_from_slice(&12u32.to_be_bytes());
data.extend_from_slice(b"mdat");
data.extend_from_slice(&[0u8; 4]);
}
let mut detector = FragmentBoundaryDetector::new();
detector.scan(&data);
assert_eq!(detector.fragment_count(), 2);
assert!(detector.all_valid());
}
}