#![allow(dead_code)]
use crate::buffered_reader::BufferedMediaReader;
use crate::format_detector::{FormatDetection, FormatDetector, MediaFormat};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IoStage {
Read,
Decompress,
Validate,
Decrypt,
Buffer,
Write,
Custom(String),
}
impl IoStage {
#[must_use]
pub fn stage_name(&self) -> &str {
match self {
IoStage::Read => "read",
IoStage::Decompress => "decompress",
IoStage::Validate => "validate",
IoStage::Decrypt => "decrypt",
IoStage::Buffer => "buffer",
IoStage::Write => "write",
IoStage::Custom(name) => name.as_str(),
}
}
}
#[derive(Debug, Clone)]
pub struct IoResult {
pub bytes_processed: u64,
pub elapsed_ms: u64,
pub stages_executed: Vec<String>,
pub success: bool,
}
impl IoResult {
#[allow(clippy::cast_precision_loss)]
#[must_use]
pub fn throughput_mbps(&self) -> f64 {
if self.elapsed_ms == 0 {
return 0.0;
}
let bytes_f = self.bytes_processed as f64;
let secs = self.elapsed_ms as f64 / 1000.0;
(bytes_f / (1024.0 * 1024.0)) / secs
}
}
#[derive(Debug, Default)]
pub struct IoPipeline {
stages: Vec<IoStage>,
}
impl IoPipeline {
#[must_use]
pub fn new() -> Self {
Self { stages: Vec::new() }
}
pub fn add_stage(&mut self, stage: IoStage) -> &mut Self {
self.stages.push(stage);
self
}
#[must_use]
pub fn stage_count(&self) -> usize {
self.stages.len()
}
pub fn execute(&self, data: &mut Vec<u8>, elapsed_ms: u64) -> IoResult {
let original_len = data.len() as u64;
let mut stages_executed = Vec::with_capacity(self.stages.len());
for stage in &self.stages {
match stage {
IoStage::Buffer => {
data.reserve(64);
}
IoStage::Validate
| IoStage::Read
| IoStage::Decompress
| IoStage::Decrypt
| IoStage::Write
| IoStage::Custom(_) => {
}
}
stages_executed.push(stage.stage_name().to_string());
}
IoResult {
bytes_processed: original_len,
elapsed_ms,
stages_executed,
success: true,
}
}
#[must_use]
pub fn stage_names(&self) -> Vec<&str> {
self.stages.iter().map(IoStage::stage_name).collect()
}
}
#[derive(Debug, Clone)]
pub struct IoPipelineConfig {
pub buffer_size: usize,
pub read_ahead_bytes: usize,
pub timeout_ms: u64,
}
impl Default for IoPipelineConfig {
fn default() -> Self {
Self {
buffer_size: 65_536,
read_ahead_bytes: 262_144,
timeout_ms: 30_000,
}
}
}
#[derive(Debug, Clone)]
pub struct MediaProbeResult {
pub format: MediaFormat,
pub duration_ms: Option<u64>,
pub bitrate_kbps: Option<u32>,
pub has_video: bool,
pub has_audio: bool,
pub video_codec_hint: Option<String>,
pub audio_codec_hint: Option<String>,
pub container_version: Option<String>,
pub file_size_bytes: usize,
pub detection: FormatDetection,
}
impl MediaProbeResult {
#[must_use]
pub fn is_video(&self) -> bool {
self.format.is_video()
}
#[must_use]
pub fn is_audio(&self) -> bool {
self.format.is_audio()
}
#[must_use]
pub fn is_image(&self) -> bool {
self.format.is_image()
}
}
#[derive(Debug)]
pub struct MediaIoPipeline {
pub config: IoPipelineConfig,
pub format_detector: FormatDetector,
}
impl MediaIoPipeline {
#[must_use]
pub fn new(config: IoPipelineConfig) -> Self {
Self {
config,
format_detector: FormatDetector::new(),
}
}
#[must_use]
pub fn probe(&self, data: &[u8]) -> MediaProbeResult {
let detection = FormatDetector::detect(data);
let format = detection.format;
let file_size_bytes = data.len();
let (has_video, has_audio) = format_av_flags(format);
let mut result = MediaProbeResult {
format,
duration_ms: None,
bitrate_kbps: None,
has_video,
has_audio,
video_codec_hint: None,
audio_codec_hint: None,
container_version: None,
file_size_bytes,
detection,
};
match format {
MediaFormat::Mp4 | MediaFormat::Mov => {
probe_mp4(data, &mut result);
}
MediaFormat::Flac => {
probe_flac(data, &mut result);
}
MediaFormat::Wav => {
probe_wav(data, &mut result);
}
MediaFormat::Mkv | MediaFormat::Webm => {
probe_mkv(data, &mut result);
}
_ => {
estimate_from_size(data.len(), format, &mut result);
}
}
result
}
#[must_use]
pub fn probe_bytes(&self, data: &[u8]) -> MediaProbeResult {
self.probe(data)
}
}
fn format_av_flags(format: MediaFormat) -> (bool, bool) {
match format {
MediaFormat::Mp4
| MediaFormat::Mov
| MediaFormat::Mkv
| MediaFormat::Webm
| MediaFormat::Avi
| MediaFormat::Flv
| MediaFormat::Ts
| MediaFormat::M2ts
| MediaFormat::Mxf => (true, true),
MediaFormat::Ogg => (false, true),
MediaFormat::Mp3
| MediaFormat::Flac
| MediaFormat::Wav
| MediaFormat::Aac
| MediaFormat::Opus
| MediaFormat::Vorbis
| MediaFormat::Aiff
| MediaFormat::Au => (false, true),
_ => (false, false),
}
}
fn probe_mp4(data: &[u8], result: &mut MediaProbeResult) {
let mut reader = BufferedMediaReader::from_bytes(data.to_vec());
loop {
if reader.remaining() < 8 {
break;
}
let size = match reader.read_u32_be() {
Some(s) => s as usize,
None => break,
};
let box_type = match reader.read_exact(4) {
Some(t) => t,
None => break,
};
if &box_type == b"mvhd" {
let version = match reader.read_u8() {
Some(v) => v,
None => break,
};
reader.skip(3);
if version == 1 {
reader.skip(8); reader.skip(8); let timescale = match reader.read_u32_be() {
Some(t) => t,
None => break,
};
let duration = match reader.read_u64_be() {
Some(d) => d,
None => break,
};
if timescale > 0 {
result.duration_ms = Some(duration * 1000 / timescale as u64);
}
} else {
reader.skip(4); reader.skip(4); let timescale = match reader.read_u32_be() {
Some(t) => t,
None => break,
};
let duration = match reader.read_u32_be() {
Some(d) => d,
None => break,
};
if timescale > 0 {
result.duration_ms = Some(duration as u64 * 1000 / timescale as u64);
}
}
if let (Some(dur), true) = (result.duration_ms, data.len() > 0) {
if dur > 0 {
#[allow(clippy::cast_precision_loss)]
let bitrate = (data.len() as f64 * 8.0 / (dur as f64 / 1000.0)) as u64 / 1000;
result.bitrate_kbps = Some(bitrate as u32);
}
}
result.video_codec_hint = Some("AVC/H.264".to_string());
result.audio_codec_hint = Some("AAC".to_string());
break;
}
if size < 8 {
break;
}
let skip_bytes = size - 8; if reader.skip(skip_bytes) < skip_bytes {
break;
}
}
}
fn probe_flac(data: &[u8], result: &mut MediaProbeResult) {
if data.len() < 8 + 34 {
return;
}
if &data[..4] != b"fLaC" {
return;
}
let block_type = data[4] & 0x7F;
if block_type != 0 {
return;
}
let si = &data[8..];
if si.len() < 34 {
return;
}
let sample_rate = ((si[10] as u32) << 12) | ((si[11] as u32) << 4) | ((si[12] as u32) >> 4);
let total_samples: u64 = (((si[13] as u64) & 0x0F) << 32)
| ((si[14] as u64) << 24)
| ((si[15] as u64) << 16)
| ((si[16] as u64) << 8)
| (si[17] as u64);
if sample_rate > 0 && total_samples > 0 {
result.duration_ms = Some(total_samples * 1000 / sample_rate as u64);
#[allow(clippy::cast_precision_loss)]
let bitrate =
(data.len() as f64 * 8.0 / (total_samples as f64 / sample_rate as f64)) as u64 / 1000;
result.bitrate_kbps = Some(bitrate as u32);
}
result.audio_codec_hint = Some("FLAC".to_string());
}
fn probe_wav(data: &[u8], result: &mut MediaProbeResult) {
if data.len() < 44 {
return;
}
if &data[..4] != b"RIFF" || &data[8..12] != b"WAVE" {
return;
}
let mut reader = BufferedMediaReader::from_bytes(data.to_vec());
reader.seek(12);
let mut num_channels: u16 = 0;
let mut sample_rate: u32 = 0;
let mut bits_per_sample: u16 = 0;
let mut data_size: u32 = 0;
loop {
if reader.remaining() < 8 {
break;
}
let chunk_id = match reader.read_exact(4) {
Some(id) => id,
None => break,
};
let chunk_size = match reader.read_u32_le() {
Some(s) => s as usize,
None => break,
};
if &chunk_id == b"fmt " {
if chunk_size < 16 {
break;
}
reader.skip(2); num_channels = reader.read_u16_le().unwrap_or(0);
sample_rate = reader.read_u32_le().unwrap_or(0);
reader.skip(4); reader.skip(2); bits_per_sample = reader.read_u16_le().unwrap_or(16);
let extra = chunk_size.saturating_sub(16);
reader.skip(extra);
} else if &chunk_id == b"data" {
data_size = chunk_size as u32;
break;
} else {
let padded = chunk_size + (chunk_size & 1);
reader.skip(padded);
}
}
if num_channels > 0 && bits_per_sample > 0 && sample_rate > 0 && data_size > 0 {
let bytes_per_sample = (bits_per_sample as u32 + 7) / 8;
let bytes_per_sec = sample_rate * num_channels as u32 * bytes_per_sample;
if bytes_per_sec > 0 {
result.duration_ms = Some(data_size as u64 * 1000 / bytes_per_sec as u64);
result.bitrate_kbps = Some(bytes_per_sec * 8 / 1000);
}
}
result.audio_codec_hint = Some("PCM".to_string());
}
fn probe_mkv(data: &[u8], result: &mut MediaProbeResult) {
let scan_end = data.len().min(262_144);
let haystack = &data[..scan_end];
let mut i = 0usize;
while i + 2 < haystack.len() {
if haystack[i] == 0x44 && haystack[i + 1] == 0x89 {
if i + 2 < haystack.len() {
let size_byte = haystack[i + 2];
let value_size = if size_byte == 0x88 {
8usize
} else if size_byte & 0x80 != 0 {
(size_byte & 0x7F) as usize
} else {
1
};
let val_start = i + 3;
let val_end = val_start + value_size;
if val_end <= haystack.len() && value_size == 8 {
let bytes: [u8; 8] = [
haystack[val_start],
haystack[val_start + 1],
haystack[val_start + 2],
haystack[val_start + 3],
haystack[val_start + 4],
haystack[val_start + 5],
haystack[val_start + 6],
haystack[val_start + 7],
];
let duration_f64 = f64::from_be_bytes(bytes);
if duration_f64.is_finite() && duration_f64 > 0.0 {
result.duration_ms = Some(duration_f64 as u64);
if duration_f64 > 0.0 {
#[allow(clippy::cast_precision_loss)]
let bitrate =
(data.len() as f64 * 8.0 / (duration_f64 / 1000.0)) as u64 / 1000;
result.bitrate_kbps = Some(bitrate as u32);
}
}
break;
}
}
}
i += 1;
}
result.video_codec_hint = Some("VP9".to_string());
result.audio_codec_hint = Some("Opus".to_string());
result.container_version = Some("4".to_string());
}
fn estimate_from_size(file_size: usize, format: MediaFormat, result: &mut MediaProbeResult) {
let typical_kbps: Option<u32> = match format {
MediaFormat::Mp3 => Some(192),
MediaFormat::Aac => Some(256),
MediaFormat::Opus => Some(96),
MediaFormat::Vorbis => Some(160),
MediaFormat::Flac => Some(800),
MediaFormat::Wav => Some(1411),
MediaFormat::Aiff => Some(1411),
MediaFormat::Mp4 => Some(2000),
MediaFormat::Mkv => Some(2000),
MediaFormat::Webm => Some(1500),
MediaFormat::Avi => Some(2500),
MediaFormat::Ts => Some(3000),
_ => None,
};
if let Some(kbps) = typical_kbps {
result.bitrate_kbps = Some(kbps);
if kbps > 0 {
let duration_ms = (file_size as u64 * 8) / kbps as u64;
result.duration_ms = Some(duration_ms);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stage_name_read() {
assert_eq!(IoStage::Read.stage_name(), "read");
}
#[test]
fn test_stage_name_decompress() {
assert_eq!(IoStage::Decompress.stage_name(), "decompress");
}
#[test]
fn test_stage_name_validate() {
assert_eq!(IoStage::Validate.stage_name(), "validate");
}
#[test]
fn test_stage_name_decrypt() {
assert_eq!(IoStage::Decrypt.stage_name(), "decrypt");
}
#[test]
fn test_stage_name_buffer() {
assert_eq!(IoStage::Buffer.stage_name(), "buffer");
}
#[test]
fn test_stage_name_write() {
assert_eq!(IoStage::Write.stage_name(), "write");
}
#[test]
fn test_stage_name_custom() {
let s = IoStage::Custom("my_stage".to_string());
assert_eq!(s.stage_name(), "my_stage");
}
#[test]
fn test_empty_pipeline() {
let p = IoPipeline::new();
assert_eq!(p.stage_count(), 0);
assert!(p.stage_names().is_empty());
}
#[test]
fn test_add_stages() {
let mut p = IoPipeline::new();
p.add_stage(IoStage::Read).add_stage(IoStage::Decompress);
assert_eq!(p.stage_count(), 2);
assert_eq!(p.stage_names(), vec!["read", "decompress"]);
}
#[test]
fn test_execute_records_stages() {
let mut p = IoPipeline::new();
p.add_stage(IoStage::Read)
.add_stage(IoStage::Validate)
.add_stage(IoStage::Write);
let mut data = vec![1u8, 2, 3, 4];
let result = p.execute(&mut data, 100);
assert!(result.success);
assert_eq!(result.stages_executed, vec!["read", "validate", "write"]);
assert_eq!(result.bytes_processed, 4);
assert_eq!(result.elapsed_ms, 100);
}
#[test]
fn test_throughput_mbps_zero_elapsed() {
let r = IoResult {
bytes_processed: 1024 * 1024,
elapsed_ms: 0,
stages_executed: vec![],
success: true,
};
assert_eq!(r.throughput_mbps(), 0.0);
}
#[test]
fn test_throughput_mbps_one_second() {
let r = IoResult {
bytes_processed: 1024 * 1024,
elapsed_ms: 1000,
stages_executed: vec![],
success: true,
};
let mbps = r.throughput_mbps();
assert!((mbps - 1.0).abs() < 1e-9);
}
#[test]
fn test_throughput_mbps_two_mib_half_second() {
let r = IoResult {
bytes_processed: 2 * 1024 * 1024,
elapsed_ms: 500,
stages_executed: vec![],
success: true,
};
let mbps = r.throughput_mbps();
assert!((mbps - 4.0).abs() < 1e-9);
}
#[test]
fn test_execute_buffer_stage() {
let mut p = IoPipeline::new();
p.add_stage(IoStage::Buffer);
let mut data = vec![0u8; 10];
let result = p.execute(&mut data, 50);
assert!(result.success);
assert_eq!(result.bytes_processed, 10);
}
#[test]
fn test_execute_custom_stage() {
let mut p = IoPipeline::new();
p.add_stage(IoStage::Custom("transcode".to_string()));
let mut data = vec![9u8; 5];
let result = p.execute(&mut data, 200);
assert_eq!(result.stages_executed, vec!["transcode"]);
}
#[test]
fn test_pipeline_config_default() {
let cfg = IoPipelineConfig::default();
assert_eq!(cfg.buffer_size, 65_536);
assert_eq!(cfg.read_ahead_bytes, 262_144);
assert_eq!(cfg.timeout_ms, 30_000);
}
#[test]
fn test_probe_jpeg() {
let cfg = IoPipelineConfig::default();
let pipeline = MediaIoPipeline::new(cfg);
let data = vec![0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10];
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Jpeg);
assert!(result.is_image());
assert!(!result.has_video);
assert!(!result.has_audio);
}
#[test]
fn test_probe_png() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = vec![0x89, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A, 0x00];
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Png);
assert!(result.is_image());
}
#[test]
fn test_probe_flac() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let mut data: Vec<u8> = b"fLaC".to_vec();
data.push(0x80); data.extend_from_slice(&[0x00, 0x00, 0x22]); data.extend_from_slice(&[0x10, 0x00, 0x10, 0x00]);
data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
data.push(0xAC); data.push(0x44); data.push(0xF0); data.push(0x00); data.push(0x00); data.push(0x00); data.push(0xAC); data.push(0x44); data.extend_from_slice(&[0u8; 16]);
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Flac);
assert!(result.has_audio);
assert!(!result.has_video);
assert_eq!(result.audio_codec_hint, Some("FLAC".to_string()));
}
#[test]
fn test_probe_wav() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data_size: u32 = 176_400;
let mut data: Vec<u8> = b"RIFF".to_vec();
let riff_size: u32 = 36 + data_size;
data.extend_from_slice(&riff_size.to_le_bytes());
data.extend_from_slice(b"WAVE");
data.extend_from_slice(b"fmt ");
data.extend_from_slice(&16u32.to_le_bytes()); data.extend_from_slice(&1u16.to_le_bytes()); data.extend_from_slice(&2u16.to_le_bytes()); data.extend_from_slice(&44100u32.to_le_bytes()); let byte_rate: u32 = 44100 * 2 * 2;
data.extend_from_slice(&byte_rate.to_le_bytes());
data.extend_from_slice(&4u16.to_le_bytes()); data.extend_from_slice(&16u16.to_le_bytes()); data.extend_from_slice(b"data");
data.extend_from_slice(&data_size.to_le_bytes());
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Wav);
assert!(result.has_audio);
assert!(!result.has_video);
let dur = result.duration_ms.expect("should have duration");
assert_eq!(dur, 1000);
assert_eq!(result.audio_codec_hint, Some("PCM".to_string()));
}
#[test]
fn test_probe_mp3_fallback() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = b"ID3\x04\x00\x00\x00\x00\x00\x00".to_vec();
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Mp3);
assert!(result.has_audio);
assert_eq!(result.bitrate_kbps, Some(192));
}
#[test]
fn test_probe_zip_no_av() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = vec![0x50u8, 0x4B, 0x03, 0x04, 0x00];
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Zip);
assert!(!result.has_video);
assert!(!result.has_audio);
}
#[test]
fn test_probe_bytes_same_as_probe() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = vec![0xFF, 0xD8, 0xFF];
let r1 = pipeline.probe(&data);
let r2 = pipeline.probe_bytes(&data);
assert_eq!(r1.format, r2.format);
}
#[test]
fn test_probe_unknown_empty() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let result = pipeline.probe(&[]);
assert_eq!(result.format, MediaFormat::Unknown);
assert!(!result.has_video);
assert!(!result.has_audio);
}
#[test]
fn test_probe_result_file_size() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = vec![0u8; 1024];
let result = pipeline.probe(&data);
assert_eq!(result.file_size_bytes, 1024);
}
#[test]
fn test_probe_flv() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = b"FLV\x01\x05\x00\x00\x00\x09\x00\x00\x00\x00".to_vec();
let result = pipeline.probe(&data);
assert_eq!(result.format, MediaFormat::Flv);
assert!(result.has_video);
assert!(result.has_audio);
}
#[test]
fn test_media_probe_result_is_image_helper() {
let pipeline = MediaIoPipeline::new(IoPipelineConfig::default());
let data = [0x89u8, b'P', b'N', b'G', 0x0D, 0x0A, 0x1A, 0x0A];
let result = pipeline.probe(&data);
assert!(result.is_image());
assert!(!result.is_video());
assert!(!result.is_audio());
}
}