use super::types::*;
use crate::*;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::time::Duration;
use tokio::time::Instant;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum CodecError {
UnsupportedFormat(String),
EncodingFailed(String),
DecodingFailed(String),
InvalidData(String),
ResourceExhausted,
}
impl std::fmt::Display for CodecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CodecError::UnsupportedFormat(format) => write!(f, "Unsupported format: {}", format),
CodecError::EncodingFailed(reason) => write!(f, "Encoding failed: {}", reason),
CodecError::DecodingFailed(reason) => write!(f, "Decoding failed: {}", reason),
CodecError::InvalidData(reason) => write!(f, "Invalid data: {}", reason),
CodecError::ResourceExhausted => write!(f, "Resource exhausted"),
}
}
}
impl std::error::Error for CodecError {}
#[derive(Debug, Clone)]
pub struct EncodingConfig {
pub quality: QualityLevel,
pub target_bitrate: u32,
pub keyframe_interval: u32,
pub enable_compression: bool,
pub preserve_metadata: bool,
}
impl Default for EncodingConfig {
fn default() -> Self {
Self {
quality: QualityLevel::Medium,
target_bitrate: 1_000_000, keyframe_interval: 30, enable_compression: true,
preserve_metadata: true,
}
}
}
#[derive(Debug, Clone)]
pub struct RawMediaData {
pub data: Vec<u8>,
pub media_type: MediaType,
pub timestamp: Duration,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Default)]
pub struct CodecStats {
pub frames_encoded: u64,
pub frames_decoded: u64,
pub bytes_processed: u64,
pub encoding_time_ms: u64,
pub average_compression_ratio: f64,
pub error_count: u64,
}
pub struct MediaCodec {
config: EncodingConfig,
stats: Arc<tokio::sync::Mutex<CodecStats>>,
}
impl Clone for MediaCodec {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
stats: Arc::clone(&self.stats),
}
}
}
impl MediaCodec {
pub fn new(config: EncodingConfig) -> Self {
Self {
config,
stats: Arc::new(tokio::sync::Mutex::new(CodecStats::default())),
}
}
pub fn encode_stream(
&self,
raw_data_stream: RS2Stream<RawMediaData>,
stream_id: String,
) -> RS2Stream<Result<MediaChunk, CodecError>> {
let self_clone = self.clone();
auto_backpressure_block(
par_eval_map(raw_data_stream, 4, move |raw_data| {
let stream_id = stream_id.clone();
let codec = self_clone.clone();
async move { codec.encode_single_frame(raw_data, stream_id).await }
}),
256,
) }
async fn encode_single_frame(
&self,
raw_data: RawMediaData,
stream_id: String,
) -> Result<MediaChunk, CodecError> {
let start_time = Instant::now();
let encoded_data = self.perform_encoding(&raw_data).await?;
let chunk_type = self.determine_chunk_type(&raw_data, &encoded_data);
let priority = chunk_type.default_priority();
let checksum = if self.config.preserve_metadata {
Some(self.calculate_checksum(&encoded_data))
} else {
None
};
{
let mut stats = self.stats.lock().await;
stats.frames_encoded += 1;
stats.bytes_processed += encoded_data.len() as u64;
let elapsed_ms = start_time.elapsed().as_millis() as u64;
let encoding_time = if elapsed_ms == 0 { 1 } else { elapsed_ms };
stats.encoding_time_ms += encoding_time;
let compression_ratio = raw_data.data.len() as f64 / encoded_data.len() as f64;
stats.average_compression_ratio = (stats.average_compression_ratio
* (stats.frames_encoded - 1) as f64
+ compression_ratio)
/ stats.frames_encoded as f64;
}
Ok(MediaChunk {
stream_id,
sequence_number: 0, data: encoded_data,
chunk_type,
priority,
timestamp: raw_data.timestamp,
is_final: false,
checksum,
})
}
async fn perform_encoding(&self, raw_data: &RawMediaData) -> Result<Vec<u8>, CodecError> {
tokio::time::sleep(Duration::from_micros(100)).await;
match raw_data.media_type {
MediaType::Video => self.encode_video(&raw_data.data).await,
MediaType::Audio => self.encode_audio(&raw_data.data).await,
MediaType::Mixed => {
let (video_data, audio_data) = self.split_mixed_data(&raw_data.data)?;
let encoded_video = self.encode_video(&video_data).await?;
let encoded_audio = self.encode_audio(&audio_data).await?;
Ok(self.combine_encoded_data(encoded_video, encoded_audio))
}
}
}
async fn encode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
if data.is_empty() {
return Err(CodecError::InvalidData("Empty video data".to_string()));
}
let compression_factor = match self.config.quality {
QualityLevel::Low => 0.3,
QualityLevel::Medium => 0.5,
QualityLevel::High => 0.7,
QualityLevel::UltraHigh => 0.9,
};
let target_size = (data.len() as f64 * compression_factor) as usize;
let mut encoded = Vec::with_capacity(target_size);
for chunk in data.chunks(8) {
let compressed_chunk: Vec<u8> = chunk
.iter()
.step_by(if self.config.enable_compression { 2 } else { 1 })
.copied()
.collect();
encoded.extend(compressed_chunk);
}
Ok(encoded)
}
async fn encode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
if data.is_empty() {
return Err(CodecError::InvalidData("Empty audio data".to_string()));
}
let compression_factor = match self.config.quality {
QualityLevel::Low => 0.2,
QualityLevel::Medium => 0.4,
QualityLevel::High => 0.6,
QualityLevel::UltraHigh => 0.8,
};
let target_size = (data.len() as f64 * compression_factor) as usize;
let mut encoded = Vec::with_capacity(target_size);
for chunk in data.chunks(4) {
if let Some(&sample) = chunk.first() {
encoded.push(sample);
}
}
Ok(encoded)
}
fn split_mixed_data(&self, data: &[u8]) -> Result<(Vec<u8>, Vec<u8>), CodecError> {
if data.len() < 2 {
return Err(CodecError::InvalidData("Mixed data too short".to_string()));
}
let split_point = (data.len() as f64 * 0.6) as usize;
let video_data = data[..split_point].to_vec();
let audio_data = data[split_point..].to_vec();
Ok((video_data, audio_data))
}
fn combine_encoded_data(&self, video: Vec<u8>, audio: Vec<u8>) -> Vec<u8> {
let mut combined = Vec::with_capacity(video.len() + audio.len() + 8);
combined.extend_from_slice(&(video.len() as u32).to_le_bytes());
combined.extend_from_slice(&(audio.len() as u32).to_le_bytes());
combined.extend(video);
combined.extend(audio);
combined
}
fn determine_chunk_type(&self, raw_data: &RawMediaData, _encoded_data: &[u8]) -> ChunkType {
match raw_data.media_type {
MediaType::Audio => ChunkType::Audio,
MediaType::Video => {
if raw_data.timestamp.as_millis() % (self.config.keyframe_interval as u128 * 33)
== 0
{
ChunkType::VideoIFrame
} else {
ChunkType::VideoPFrame
}
}
MediaType::Mixed => ChunkType::VideoIFrame, }
}
fn calculate_checksum(&self, data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
pub async fn decode_chunk(&self, chunk: MediaChunk) -> Result<RawMediaData, CodecError> {
let decoded_data = match chunk.chunk_type {
ChunkType::Audio => self.decode_audio(&chunk.data).await?,
ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
self.decode_video(&chunk.data).await?
}
ChunkType::Metadata => chunk.data.clone(), ChunkType::Thumbnail => self.decode_video(&chunk.data).await?, };
if let Some(expected_checksum) = &chunk.checksum {
let actual_checksum = self.calculate_checksum(&chunk.data);
if actual_checksum != *expected_checksum {
return Err(CodecError::InvalidData("Checksum mismatch".to_string()));
}
}
let mut stats = self.stats.lock().await;
stats.frames_decoded += 1;
Ok(RawMediaData {
data: decoded_data,
media_type: match chunk.chunk_type {
ChunkType::Audio => MediaType::Audio,
_ => MediaType::Video,
},
timestamp: chunk.timestamp,
metadata: HashMap::new(),
})
}
async fn decode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
let mut decoded = Vec::with_capacity(data.len() * 2);
for &byte in data {
decoded.push(byte);
if self.config.enable_compression {
decoded.push(byte); }
}
Ok(decoded)
}
async fn decode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
let mut decoded = Vec::with_capacity(data.len() * 4);
for &sample in data {
decoded.extend_from_slice(&[sample, sample, sample, sample]);
}
Ok(decoded)
}
pub async fn get_stats(&self) -> CodecStats {
let stats = self.stats.lock().await;
stats.clone()
}
pub async fn reset_stats(&self) {
let mut stats = self.stats.lock().await;
*stats = CodecStats::default();
}
}
pub struct CodecFactory;
impl CodecFactory {
pub fn create_h264_codec(quality: QualityLevel) -> MediaCodec {
let config = EncodingConfig {
quality,
target_bitrate: match quality {
QualityLevel::Low => 500_000, QualityLevel::Medium => 1_500_000, QualityLevel::High => 5_000_000, QualityLevel::UltraHigh => 15_000_000, },
keyframe_interval: 30,
enable_compression: true,
preserve_metadata: true,
};
MediaCodec::new(config)
}
pub fn create_audio_codec(quality: QualityLevel) -> MediaCodec {
let config = EncodingConfig {
quality,
target_bitrate: match quality {
QualityLevel::Low => 64_000, QualityLevel::Medium => 128_000, QualityLevel::High => 320_000, QualityLevel::UltraHigh => 500_000, },
keyframe_interval: 0, enable_compression: true,
preserve_metadata: false, };
MediaCodec::new(config)
}
pub fn create_adaptive_codec() -> MediaCodec {
let config = EncodingConfig {
quality: QualityLevel::Medium, target_bitrate: 1_000_000,
keyframe_interval: 60, enable_compression: true,
preserve_metadata: true,
};
MediaCodec::new(config)
}
}