1use super::types::*;
7use crate::*;
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::time::Instant;
12use std::sync::Arc;
13
14#[derive(Debug, Clone)]
16pub enum CodecError {
17 UnsupportedFormat(String),
18 EncodingFailed(String),
19 DecodingFailed(String),
20 InvalidData(String),
21 ResourceExhausted,
22}
23
24impl std::fmt::Display for CodecError {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 match self {
27 CodecError::UnsupportedFormat(format) => write!(f, "Unsupported format: {}", format),
28 CodecError::EncodingFailed(reason) => write!(f, "Encoding failed: {}", reason),
29 CodecError::DecodingFailed(reason) => write!(f, "Decoding failed: {}", reason),
30 CodecError::InvalidData(reason) => write!(f, "Invalid data: {}", reason),
31 CodecError::ResourceExhausted => write!(f, "Resource exhausted"),
32 }
33 }
34}
35
36impl std::error::Error for CodecError {}
37
38#[derive(Debug, Clone)]
40pub struct EncodingConfig {
41 pub quality: QualityLevel,
42 pub target_bitrate: u32,
43 pub keyframe_interval: u32,
44 pub enable_compression: bool,
45 pub preserve_metadata: bool,
46}
47
48impl Default for EncodingConfig {
49 fn default() -> Self {
50 Self {
51 quality: QualityLevel::Medium,
52 target_bitrate: 1_000_000, keyframe_interval: 30, enable_compression: true,
55 preserve_metadata: true,
56 }
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct RawMediaData {
63 pub data: Vec<u8>,
64 pub media_type: MediaType,
65 pub timestamp: Duration,
66 pub metadata: HashMap<String, String>,
67}
68
69#[derive(Debug, Clone, Default)]
71pub struct CodecStats {
72 pub frames_encoded: u64,
73 pub frames_decoded: u64,
74 pub bytes_processed: u64,
75 pub encoding_time_ms: u64,
76 pub average_compression_ratio: f64,
77 pub error_count: u64,
78}
79
80pub struct MediaCodec {
82 config: EncodingConfig,
83 stats: Arc<tokio::sync::Mutex<CodecStats>>,
84}
85
86impl Clone for MediaCodec {
87 fn clone(&self) -> Self {
88 Self {
89 config: self.config.clone(),
90 stats: Arc::clone(&self.stats),
91 }
92 }
93}
94
95impl MediaCodec {
96 pub fn new(config: EncodingConfig) -> Self {
97 Self {
98 config,
99 stats: Arc::new(tokio::sync::Mutex::new(CodecStats::default())),
100 }
101 }
102
103 pub fn encode_stream(
105 &self,
106 raw_data_stream: RS2Stream<RawMediaData>,
107 stream_id: String,
108 ) -> RS2Stream<Result<MediaChunk, CodecError>> {
109 let self_clone = self.clone();
110 auto_backpressure_block(
111 par_eval_map(raw_data_stream, 4, move |raw_data| {
112 let stream_id = stream_id.clone();
113 let codec = self_clone.clone();
114 async move { codec.encode_single_frame(raw_data, stream_id).await }
115 }),
116 256,
117 ) }
119
120 async fn encode_single_frame(
122 &self,
123 raw_data: RawMediaData,
124 stream_id: String,
125 ) -> Result<MediaChunk, CodecError> {
126 let start_time = Instant::now();
127
128 let encoded_data = self.perform_encoding(&raw_data).await?;
130
131 let chunk_type = self.determine_chunk_type(&raw_data, &encoded_data);
133 let priority = chunk_type.default_priority();
134
135 let checksum = if self.config.preserve_metadata {
137 Some(self.calculate_checksum(&encoded_data))
138 } else {
139 None
140 };
141
142 {
144 let mut stats = self.stats.lock().await;
145 stats.frames_encoded += 1;
146 stats.bytes_processed += encoded_data.len() as u64;
147
148 let elapsed_ms = start_time.elapsed().as_millis() as u64;
150 let encoding_time = if elapsed_ms == 0 { 1 } else { elapsed_ms };
151 stats.encoding_time_ms += encoding_time;
152
153 let compression_ratio = raw_data.data.len() as f64 / encoded_data.len() as f64;
154 stats.average_compression_ratio = (stats.average_compression_ratio
155 * (stats.frames_encoded - 1) as f64
156 + compression_ratio)
157 / stats.frames_encoded as f64;
158 }
159
160 Ok(MediaChunk {
161 stream_id,
162 sequence_number: 0, data: encoded_data,
164 chunk_type,
165 priority,
166 timestamp: raw_data.timestamp,
167 is_final: false,
168 checksum,
169 })
170 }
171
172 async fn perform_encoding(&self, raw_data: &RawMediaData) -> Result<Vec<u8>, CodecError> {
174 tokio::time::sleep(Duration::from_micros(100)).await;
176
177 match raw_data.media_type {
178 MediaType::Video => self.encode_video(&raw_data.data).await,
179 MediaType::Audio => self.encode_audio(&raw_data.data).await,
180 MediaType::Mixed => {
181 let (video_data, audio_data) = self.split_mixed_data(&raw_data.data)?;
183 let encoded_video = self.encode_video(&video_data).await?;
184 let encoded_audio = self.encode_audio(&audio_data).await?;
185 Ok(self.combine_encoded_data(encoded_video, encoded_audio))
186 }
187 }
188 }
189
190 async fn encode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
191 if data.is_empty() {
192 return Err(CodecError::InvalidData("Empty video data".to_string()));
193 }
194
195 let compression_factor = match self.config.quality {
197 QualityLevel::Low => 0.3,
198 QualityLevel::Medium => 0.5,
199 QualityLevel::High => 0.7,
200 QualityLevel::UltraHigh => 0.9,
201 };
202
203 let target_size = (data.len() as f64 * compression_factor) as usize;
204 let mut encoded = Vec::with_capacity(target_size);
205
206 for chunk in data.chunks(8) {
208 let compressed_chunk: Vec<u8> = chunk
209 .iter()
210 .step_by(if self.config.enable_compression { 2 } else { 1 })
211 .copied()
212 .collect();
213 encoded.extend(compressed_chunk);
214 }
215
216 Ok(encoded)
217 }
218
219 async fn encode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
220 if data.is_empty() {
221 return Err(CodecError::InvalidData("Empty audio data".to_string()));
222 }
223
224 let compression_factor = match self.config.quality {
226 QualityLevel::Low => 0.2,
227 QualityLevel::Medium => 0.4,
228 QualityLevel::High => 0.6,
229 QualityLevel::UltraHigh => 0.8,
230 };
231
232 let target_size = (data.len() as f64 * compression_factor) as usize;
233 let mut encoded = Vec::with_capacity(target_size);
234
235 for chunk in data.chunks(4) {
237 if let Some(&sample) = chunk.first() {
238 encoded.push(sample);
239 }
240 }
241
242 Ok(encoded)
243 }
244
245 fn split_mixed_data(&self, data: &[u8]) -> Result<(Vec<u8>, Vec<u8>), CodecError> {
246 if data.len() < 2 {
247 return Err(CodecError::InvalidData("Mixed data too short".to_string()));
248 }
249
250 let split_point = (data.len() as f64 * 0.6) as usize;
252 let video_data = data[..split_point].to_vec();
253 let audio_data = data[split_point..].to_vec();
254
255 Ok((video_data, audio_data))
256 }
257
258 fn combine_encoded_data(&self, video: Vec<u8>, audio: Vec<u8>) -> Vec<u8> {
259 let mut combined = Vec::with_capacity(video.len() + audio.len() + 8);
260
261 combined.extend_from_slice(&(video.len() as u32).to_le_bytes());
263 combined.extend_from_slice(&(audio.len() as u32).to_le_bytes());
264 combined.extend(video);
265 combined.extend(audio);
266
267 combined
268 }
269
270 fn determine_chunk_type(&self, raw_data: &RawMediaData, _encoded_data: &[u8]) -> ChunkType {
271 match raw_data.media_type {
272 MediaType::Audio => ChunkType::Audio,
273 MediaType::Video => {
274 if raw_data.timestamp.as_millis() % (self.config.keyframe_interval as u128 * 33)
276 == 0
277 {
278 ChunkType::VideoIFrame
279 } else {
280 ChunkType::VideoPFrame
281 }
282 }
283 MediaType::Mixed => ChunkType::VideoIFrame, }
285 }
286
287 fn calculate_checksum(&self, data: &[u8]) -> String {
288 let mut hasher = Sha256::new();
289 hasher.update(data);
290 format!("{:x}", hasher.finalize())
291 }
292
293 pub async fn decode_chunk(&self, chunk: MediaChunk) -> Result<RawMediaData, CodecError> {
295 let decoded_data = match chunk.chunk_type {
296 ChunkType::Audio => self.decode_audio(&chunk.data).await?,
297 ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
298 self.decode_video(&chunk.data).await?
299 }
300 ChunkType::Metadata => chunk.data.clone(), ChunkType::Thumbnail => self.decode_video(&chunk.data).await?, };
303
304 if let Some(expected_checksum) = &chunk.checksum {
306 let actual_checksum = self.calculate_checksum(&chunk.data);
307 if actual_checksum != *expected_checksum {
308 return Err(CodecError::InvalidData("Checksum mismatch".to_string()));
309 }
310 }
311
312 let mut stats = self.stats.lock().await;
313 stats.frames_decoded += 1;
314
315 Ok(RawMediaData {
316 data: decoded_data,
317 media_type: match chunk.chunk_type {
318 ChunkType::Audio => MediaType::Audio,
319 _ => MediaType::Video,
320 },
321 timestamp: chunk.timestamp,
322 metadata: HashMap::new(),
323 })
324 }
325
326 async fn decode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
327 let mut decoded = Vec::with_capacity(data.len() * 2);
329
330 for &byte in data {
331 decoded.push(byte);
332 if self.config.enable_compression {
333 decoded.push(byte); }
335 }
336
337 Ok(decoded)
338 }
339
340 async fn decode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
341 let mut decoded = Vec::with_capacity(data.len() * 4);
343
344 for &sample in data {
345 decoded.extend_from_slice(&[sample, sample, sample, sample]);
347 }
348
349 Ok(decoded)
350 }
351
352 pub async fn get_stats(&self) -> CodecStats {
354 let stats = self.stats.lock().await;
355 stats.clone()
356 }
357
358 pub async fn reset_stats(&self) {
360 let mut stats = self.stats.lock().await;
361 *stats = CodecStats::default();
362 }
363}
364
365pub struct CodecFactory;
367
368impl CodecFactory {
369 pub fn create_h264_codec(quality: QualityLevel) -> MediaCodec {
370 let config = EncodingConfig {
371 quality,
372 target_bitrate: match quality {
373 QualityLevel::Low => 500_000, QualityLevel::Medium => 1_500_000, QualityLevel::High => 5_000_000, QualityLevel::UltraHigh => 15_000_000, },
378 keyframe_interval: 30,
379 enable_compression: true,
380 preserve_metadata: true,
381 };
382
383 MediaCodec::new(config)
384 }
385
386 pub fn create_audio_codec(quality: QualityLevel) -> MediaCodec {
387 let config = EncodingConfig {
388 quality,
389 target_bitrate: match quality {
390 QualityLevel::Low => 64_000, QualityLevel::Medium => 128_000, QualityLevel::High => 320_000, QualityLevel::UltraHigh => 500_000, },
395 keyframe_interval: 0, enable_compression: true,
397 preserve_metadata: false, };
399
400 MediaCodec::new(config)
401 }
402
403 pub fn create_adaptive_codec() -> MediaCodec {
404 let config = EncodingConfig {
406 quality: QualityLevel::Medium, target_bitrate: 1_000_000,
408 keyframe_interval: 60, enable_compression: true,
410 preserve_metadata: true,
411 };
412
413 MediaCodec::new(config)
414 }
415}