#![forbid(unsafe_code)]
use oximedia_core::{CodecId, OxiError, OxiResult};
use std::collections::{HashMap, VecDeque};
use crate::{Packet, StreamInfo};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
All,
Video,
None,
}
#[derive(Debug, Clone)]
pub struct TrackManagerConfig {
pub sync_mode: SyncMode,
pub max_buffer_size: usize,
pub auto_interleave: bool,
pub interleave_duration_ms: u64,
}
impl Default for TrackManagerConfig {
fn default() -> Self {
Self {
sync_mode: SyncMode::All,
max_buffer_size: 100,
auto_interleave: true,
interleave_duration_ms: 500,
}
}
}
impl TrackManagerConfig {
#[must_use]
pub const fn new() -> Self {
Self {
sync_mode: SyncMode::All,
max_buffer_size: 100,
auto_interleave: true,
interleave_duration_ms: 500,
}
}
#[must_use]
pub const fn with_sync_mode(mut self, mode: SyncMode) -> Self {
self.sync_mode = mode;
self
}
#[must_use]
pub const fn with_max_buffer_size(mut self, size: usize) -> Self {
self.max_buffer_size = size;
self
}
#[must_use]
pub const fn with_auto_interleave(mut self, enabled: bool) -> Self {
self.auto_interleave = enabled;
self
}
#[must_use]
pub const fn with_interleave_duration(mut self, duration_ms: u64) -> Self {
self.interleave_duration_ms = duration_ms;
self
}
}
#[derive(Debug, Clone)]
pub struct TrackInfo {
pub stream_info: StreamInfo,
pub enabled: bool,
pub priority: i32,
pub language: Option<String>,
pub label: Option<String>,
}
impl TrackInfo {
#[must_use]
pub const fn new(stream_info: StreamInfo) -> Self {
Self {
stream_info,
enabled: true,
priority: 0,
language: None,
label: None,
}
}
#[must_use]
pub const fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
#[must_use]
pub const fn with_priority(mut self, priority: i32) -> Self {
self.priority = priority;
self
}
#[must_use]
pub fn with_language(mut self, language: impl Into<String>) -> Self {
self.language = Some(language.into());
self
}
#[must_use]
pub fn with_label(mut self, label: impl Into<String>) -> Self {
self.label = Some(label.into());
self
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct TrackStats {
pub packets_received: u64,
pub packets_dropped: u64,
pub bytes_received: u64,
pub last_timestamp: Option<i64>,
}
impl TrackStats {
#[must_use]
pub const fn new() -> Self {
Self {
packets_received: 0,
packets_dropped: 0,
bytes_received: 0,
last_timestamp: None,
}
}
pub fn update(&mut self, packet: &Packet) {
self.packets_received += 1;
self.bytes_received += packet.size() as u64;
self.last_timestamp = Some(packet.pts());
}
pub fn record_drop(&mut self) {
self.packets_dropped += 1;
}
}
pub struct TrackManager {
config: TrackManagerConfig,
tracks: Vec<TrackInfo>,
buffers: HashMap<usize, VecDeque<Packet>>,
stats: HashMap<usize, TrackStats>,
}
impl TrackManager {
#[must_use]
pub fn new() -> Self {
Self::with_config(TrackManagerConfig::default())
}
#[must_use]
pub fn with_config(config: TrackManagerConfig) -> Self {
Self {
config,
tracks: Vec::new(),
buffers: HashMap::new(),
stats: HashMap::new(),
}
}
pub fn add_track(&mut self, info: TrackInfo) -> usize {
let index = self.tracks.len();
self.tracks.push(info);
self.buffers.insert(index, VecDeque::new());
self.stats.insert(index, TrackStats::new());
index
}
#[must_use]
pub fn tracks(&self) -> &[TrackInfo] {
&self.tracks
}
#[must_use]
pub fn track_stats(&self, index: usize) -> Option<&TrackStats> {
self.stats.get(&index)
}
pub fn add_packet(&mut self, packet: Packet) -> OxiResult<()> {
let index = packet.stream_index;
if index >= self.tracks.len() {
return Err(OxiError::InvalidData(format!(
"Invalid track index: {index}"
)));
}
if !self.tracks[index].enabled {
return Ok(());
}
let buffer = self
.buffers
.get_mut(&index)
.ok_or_else(|| OxiError::InvalidData("Track not found".into()))?;
if buffer.len() >= self.config.max_buffer_size {
buffer.pop_front();
if let Some(stats) = self.stats.get_mut(&index) {
stats.record_drop();
}
}
if let Some(stats) = self.stats.get_mut(&index) {
stats.update(&packet);
}
buffer.push_back(packet);
Ok(())
}
pub fn get_next_packet(&mut self) -> Option<Packet> {
if !self.config.auto_interleave {
return self.get_first_packet();
}
match self.config.sync_mode {
SyncMode::All => self.get_synchronized_packet(),
SyncMode::Video => self.get_video_synchronized_packet(),
SyncMode::None => self.get_first_packet(),
}
}
fn get_first_packet(&mut self) -> Option<Packet> {
for index in 0..self.tracks.len() {
if let Some(buffer) = self.buffers.get_mut(&index) {
if let Some(packet) = buffer.pop_front() {
return Some(packet);
}
}
}
None
}
fn get_synchronized_packet(&mut self) -> Option<Packet> {
let mut earliest_index = None;
let mut earliest_pts = i64::MAX;
for (index, buffer) in &self.buffers {
if self.tracks[*index].enabled {
if let Some(packet) = buffer.front() {
if packet.pts() < earliest_pts {
earliest_pts = packet.pts();
earliest_index = Some(*index);
}
}
}
}
if let Some(index) = earliest_index {
self.buffers.get_mut(&index)?.pop_front()
} else {
None
}
}
fn get_video_synchronized_packet(&mut self) -> Option<Packet> {
let mut earliest_index = None;
let mut earliest_pts = i64::MAX;
for (index, buffer) in &self.buffers {
if self.tracks[*index].enabled {
let is_video = matches!(
self.tracks[*index].stream_info.codec,
CodecId::Av1 | CodecId::Vp8 | CodecId::Vp9
);
if is_video {
if let Some(packet) = buffer.front() {
if packet.pts() < earliest_pts {
earliest_pts = packet.pts();
earliest_index = Some(*index);
}
}
}
}
}
if let Some(index) = earliest_index {
self.buffers.get_mut(&index)?.pop_front()
} else {
self.get_first_packet()
}
}
#[must_use]
pub fn buffered_count(&self, index: usize) -> usize {
self.buffers.get(&index).map_or(0, VecDeque::len)
}
#[must_use]
pub fn total_buffered(&self) -> usize {
self.buffers.values().map(VecDeque::len).sum()
}
pub fn clear_buffers(&mut self) {
for buffer in self.buffers.values_mut() {
buffer.clear();
}
}
pub fn set_track_enabled(&mut self, index: usize, enabled: bool) -> OxiResult<()> {
self.tracks
.get_mut(index)
.ok_or_else(|| OxiError::InvalidData("Track not found".into()))?
.enabled = enabled;
Ok(())
}
}
impl Default for TrackManager {
fn default() -> Self {
Self::new()
}
}
pub struct InterleavingCalculator {
#[allow(dead_code)]
target_duration_ms: u64,
}
impl InterleavingCalculator {
#[must_use]
pub const fn new(target_duration_ms: u64) -> Self {
Self { target_duration_ms }
}
#[must_use]
pub fn calculate_order(&self, packets: &[&Packet]) -> Vec<usize> {
let mut packets_with_index: Vec<(usize, &Packet)> =
packets.iter().enumerate().map(|(i, p)| (i, *p)).collect();
packets_with_index.sort_by_key(|(_, p)| p.pts());
packets_with_index.into_iter().map(|(i, _)| i).collect()
}
#[must_use]
pub fn needs_reordering(&self, packets: &[&Packet]) -> bool {
if packets.len() < 2 {
return false;
}
for i in 1..packets.len() {
if packets[i].pts() < packets[i - 1].pts() {
return true;
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use oximedia_core::{Rational, Timestamp};
fn create_test_stream(index: usize, codec: CodecId) -> StreamInfo {
let mut stream = StreamInfo::new(index, codec, Rational::new(1, 48000));
stream.codec_params = crate::stream::CodecParams::audio(48000, 2);
stream
}
fn create_test_packet(stream_index: usize, pts: i64) -> Packet {
Packet::new(
stream_index,
Bytes::new(),
Timestamp::new(pts, Rational::new(1, 1000)),
crate::PacketFlags::empty(),
)
}
#[test]
fn test_track_manager_config() {
let config = TrackManagerConfig::new()
.with_sync_mode(SyncMode::Video)
.with_max_buffer_size(50)
.with_auto_interleave(false)
.with_interleave_duration(1000);
assert_eq!(config.sync_mode, SyncMode::Video);
assert_eq!(config.max_buffer_size, 50);
assert!(!config.auto_interleave);
assert_eq!(config.interleave_duration_ms, 1000);
}
#[test]
fn test_track_info() {
let stream = create_test_stream(0, CodecId::Opus);
let track = TrackInfo::new(stream)
.with_enabled(true)
.with_priority(10)
.with_language("eng")
.with_label("English");
assert!(track.enabled);
assert_eq!(track.priority, 10);
assert_eq!(track.language, Some("eng".into()));
assert_eq!(track.label, Some("English".into()));
}
#[test]
fn test_track_stats() {
let mut stats = TrackStats::new();
assert_eq!(stats.packets_received, 0);
let packet = create_test_packet(0, 1000);
stats.update(&packet);
assert_eq!(stats.packets_received, 1);
assert_eq!(stats.last_timestamp, Some(1000));
stats.record_drop();
assert_eq!(stats.packets_dropped, 1);
}
#[test]
fn test_track_manager() {
let mut manager = TrackManager::new();
let stream = create_test_stream(0, CodecId::Opus);
let track = TrackInfo::new(stream);
let index = manager.add_track(track);
assert_eq!(index, 0);
assert_eq!(manager.tracks().len(), 1);
let packet = create_test_packet(0, 1000);
assert!(manager.add_packet(packet).is_ok());
assert_eq!(manager.buffered_count(0), 1);
assert_eq!(manager.total_buffered(), 1);
}
#[test]
fn test_interleaving_calculator() {
let calc = InterleavingCalculator::new(500);
let packets = vec![
create_test_packet(0, 1000),
create_test_packet(1, 500),
create_test_packet(0, 1500),
];
let packet_refs: Vec<&Packet> = packets.iter().collect();
assert!(calc.needs_reordering(&packet_refs));
let order = calc.calculate_order(&packet_refs);
assert_eq!(order, vec![1, 0, 2]); }
}