#![forbid(unsafe_code)]
use async_trait::async_trait;
use bytes::Bytes;
use oximedia_core::{OxiError, OxiResult};
use std::collections::VecDeque;
#[cfg(not(target_arch = "wasm32"))]
use tokio::sync::mpsc;
use crate::{Demuxer, Packet, ProbeResult, StreamInfo};
#[derive(Clone, Debug)]
pub struct StreamingDemuxerConfig {
pub initial_buffer_size: usize,
pub max_buffer_size: usize,
pub low_latency: bool,
pub read_timeout_ms: u64,
}
impl Default for StreamingDemuxerConfig {
fn default() -> Self {
Self {
initial_buffer_size: 64 * 1024, max_buffer_size: 10 * 1024 * 1024, low_latency: false,
read_timeout_ms: 5000,
}
}
}
impl StreamingDemuxerConfig {
#[must_use]
pub const fn new() -> Self {
Self {
initial_buffer_size: 64 * 1024,
max_buffer_size: 10 * 1024 * 1024,
low_latency: false,
read_timeout_ms: 5000,
}
}
#[must_use]
pub const fn with_low_latency(mut self, enabled: bool) -> Self {
self.low_latency = enabled;
self
}
#[must_use]
pub const fn with_initial_buffer(mut self, size: usize) -> Self {
self.initial_buffer_size = size;
self
}
#[must_use]
pub const fn with_max_buffer(mut self, size: usize) -> Self {
self.max_buffer_size = size;
self
}
#[must_use]
pub const fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.read_timeout_ms = timeout_ms;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StreamingState {
Initializing,
Buffering,
Active,
Underrun,
Eof,
}
pub struct StreamingDemuxer<D: Demuxer> {
inner: D,
config: StreamingDemuxerConfig,
#[allow(dead_code)]
buffer: VecDeque<u8>,
state: StreamingState,
bytes_buffered: usize,
packets_read: u64,
}
impl<D: Demuxer> StreamingDemuxer<D> {
pub fn new(inner: D) -> Self {
Self::with_config(inner, StreamingDemuxerConfig::default())
}
pub fn with_config(inner: D, config: StreamingDemuxerConfig) -> Self {
let buffer_size = config.initial_buffer_size;
Self {
inner,
config,
buffer: VecDeque::with_capacity(buffer_size),
state: StreamingState::Initializing,
bytes_buffered: 0,
packets_read: 0,
}
}
#[must_use]
pub const fn state(&self) -> StreamingState {
self.state
}
#[must_use]
pub const fn bytes_buffered(&self) -> usize {
self.bytes_buffered
}
#[must_use]
pub const fn packets_read(&self) -> u64 {
self.packets_read
}
#[must_use]
pub const fn inner(&self) -> &D {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut D {
&mut self.inner
}
#[must_use]
pub fn into_inner(self) -> D {
self.inner
}
fn needs_buffering(&self) -> bool {
if self.config.low_latency {
return false;
}
self.bytes_buffered < self.config.initial_buffer_size
}
fn update_state(&mut self) {
if self.bytes_buffered == 0 {
if self.state == StreamingState::Eof {
return;
}
self.state = StreamingState::Underrun;
} else if self.needs_buffering() {
self.state = StreamingState::Buffering;
} else {
self.state = StreamingState::Active;
}
}
}
#[async_trait]
impl<D: Demuxer> Demuxer for StreamingDemuxer<D> {
async fn probe(&mut self) -> OxiResult<ProbeResult> {
self.state = StreamingState::Initializing;
let result = self.inner.probe().await?;
self.update_state();
Ok(result)
}
async fn read_packet(&mut self) -> OxiResult<Packet> {
if self.needs_buffering() && self.state != StreamingState::Eof {
self.state = StreamingState::Buffering;
}
match self.inner.read_packet().await {
Ok(packet) => {
self.packets_read += 1;
self.state = StreamingState::Active;
Ok(packet)
}
Err(OxiError::Eof) => {
self.state = StreamingState::Eof;
Err(OxiError::Eof)
}
Err(e) => {
self.state = StreamingState::Underrun;
Err(e)
}
}
}
fn streams(&self) -> &[StreamInfo] {
self.inner.streams()
}
fn is_seekable(&self) -> bool {
false
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct PacketReceiver {
rx: mpsc::UnboundedReceiver<OxiResult<Packet>>,
streams: Vec<StreamInfo>,
}
#[cfg(not(target_arch = "wasm32"))]
impl PacketReceiver {
fn new(rx: mpsc::UnboundedReceiver<OxiResult<Packet>>, streams: Vec<StreamInfo>) -> Self {
Self { rx, streams }
}
pub async fn recv(&mut self) -> Option<OxiResult<Packet>> {
self.rx.recv().await
}
#[must_use]
pub fn streams(&self) -> &[StreamInfo] {
&self.streams
}
pub fn try_recv(&mut self) -> Result<OxiResult<Packet>, mpsc::error::TryRecvError> {
self.rx.try_recv()
}
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn spawn_demuxer<D: Demuxer + Send + 'static>(
mut demuxer: D,
) -> OxiResult<PacketReceiver> {
demuxer.probe().await?;
let streams = demuxer.streams().to_vec();
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
loop {
match demuxer.read_packet().await {
Ok(packet) => {
if tx.send(Ok(packet)).is_err() {
break;
}
}
Err(OxiError::Eof) => {
let _ = tx.send(Err(OxiError::Eof));
break;
}
Err(e) => {
let _ = tx.send(Err(e));
break;
}
}
}
});
Ok(PacketReceiver::new(rx, streams))
}
#[derive(Debug)]
pub struct ProgressiveBuffer {
data: VecDeque<u8>,
max_size: usize,
total_received: u64,
}
impl ProgressiveBuffer {
#[must_use]
pub fn new(max_size: usize) -> Self {
Self {
data: VecDeque::with_capacity(max_size.min(64 * 1024)),
max_size,
total_received: 0,
}
}
pub fn append(&mut self, data: &[u8]) -> OxiResult<()> {
if self.data.len() + data.len() > self.max_size {
return Err(OxiError::BufferTooSmall {
needed: self.data.len() + data.len(),
have: self.max_size,
});
}
self.data.extend(data);
self.total_received += data.len() as u64;
Ok(())
}
pub fn consume(&mut self, count: usize) -> Option<Bytes> {
if count > self.data.len() {
return None;
}
let bytes: Vec<u8> = self.data.drain(..count).collect();
Some(Bytes::from(bytes))
}
#[must_use]
pub fn peek(&self, count: usize) -> Option<&[u8]> {
if count > self.data.len() {
return None;
}
let (first, _second) = self.data.as_slices();
if count <= first.len() {
Some(&first[..count])
} else {
None }
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[must_use]
pub const fn total_received(&self) -> u64 {
self.total_received
}
pub fn clear(&mut self) {
self.data.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = StreamingDemuxerConfig::default();
assert_eq!(config.initial_buffer_size, 64 * 1024);
assert_eq!(config.max_buffer_size, 10 * 1024 * 1024);
assert!(!config.low_latency);
}
#[test]
fn test_config_builder() {
let config = StreamingDemuxerConfig::new()
.with_low_latency(true)
.with_initial_buffer(128 * 1024)
.with_max_buffer(20 * 1024 * 1024)
.with_timeout(10000);
assert!(config.low_latency);
assert_eq!(config.initial_buffer_size, 128 * 1024);
assert_eq!(config.max_buffer_size, 20 * 1024 * 1024);
assert_eq!(config.read_timeout_ms, 10000);
}
#[test]
fn test_progressive_buffer() {
let mut buffer = ProgressiveBuffer::new(1024);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
buffer
.append(&[1, 2, 3, 4])
.expect("operation should succeed");
assert_eq!(buffer.len(), 4);
assert_eq!(buffer.total_received(), 4);
let peeked = buffer.peek(2).expect("operation should succeed");
assert_eq!(peeked, &[1, 2]);
assert_eq!(buffer.len(), 4);
let consumed = buffer.consume(2).expect("operation should succeed");
assert_eq!(consumed.as_ref(), &[1, 2]);
assert_eq!(buffer.len(), 2);
buffer.clear();
assert!(buffer.is_empty());
}
#[test]
fn test_progressive_buffer_overflow() {
let mut buffer = ProgressiveBuffer::new(10);
assert!(buffer.append(&[1, 2, 3, 4, 5]).is_ok());
assert!(buffer.append(&[6, 7, 8, 9, 10, 11]).is_err());
}
}