1use super::types::*;
7use crate::*;
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::time::Instant;
12
13#[derive(Debug, Clone)]
15pub enum CodecError {
16 UnsupportedFormat(String),
17 EncodingFailed(String),
18 DecodingFailed(String),
19 InvalidData(String),
20 ResourceExhausted,
21}
22
23impl std::fmt::Display for CodecError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 CodecError::UnsupportedFormat(format) => write!(f, "Unsupported format: {}", format),
27 CodecError::EncodingFailed(reason) => write!(f, "Encoding failed: {}", reason),
28 CodecError::DecodingFailed(reason) => write!(f, "Decoding failed: {}", reason),
29 CodecError::InvalidData(reason) => write!(f, "Invalid data: {}", reason),
30 CodecError::ResourceExhausted => write!(f, "Resource exhausted"),
31 }
32 }
33}
34
35impl std::error::Error for CodecError {}
36
37#[derive(Debug, Clone)]
39pub struct EncodingConfig {
40 pub quality: QualityLevel,
41 pub target_bitrate: u32,
42 pub keyframe_interval: u32,
43 pub enable_compression: bool,
44 pub preserve_metadata: bool,
45}
46
47impl Default for EncodingConfig {
48 fn default() -> Self {
49 Self {
50 quality: QualityLevel::Medium,
51 target_bitrate: 1_000_000, keyframe_interval: 30, enable_compression: true,
54 preserve_metadata: true,
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
61pub struct RawMediaData {
62 pub data: Vec<u8>,
63 pub media_type: MediaType,
64 pub timestamp: Duration,
65 pub metadata: HashMap<String, String>,
66}
67
68#[derive(Debug, Clone, Default)]
70pub struct CodecStats {
71 pub frames_encoded: u64,
72 pub frames_decoded: u64,
73 pub bytes_processed: u64,
74 pub encoding_time_ms: u64,
75 pub average_compression_ratio: f64,
76 pub error_count: u64,
77}
78
79pub struct MediaCodec {
81 config: EncodingConfig,
82 stats: tokio::sync::Mutex<CodecStats>,
83}
84
85impl Clone for MediaCodec {
86 fn clone(&self) -> Self {
87 Self {
88 config: self.config.clone(),
89 stats: tokio::sync::Mutex::new(CodecStats::default()),
90 }
91 }
92}
93
94impl MediaCodec {
95 pub fn new(config: EncodingConfig) -> Self {
96 Self {
97 config,
98 stats: tokio::sync::Mutex::new(CodecStats::default()),
99 }
100 }
101
102 pub fn encode_stream(
104 &self,
105 raw_data_stream: RS2Stream<RawMediaData>,
106 stream_id: String,
107 ) -> RS2Stream<Result<MediaChunk, CodecError>> {
108 let self_clone = self.clone();
109 auto_backpressure_block(par_eval_map(raw_data_stream, 4, move |raw_data| {
110 let stream_id = stream_id.clone();
111 let codec = self_clone.clone();
112 async move { codec.encode_single_frame(raw_data, stream_id).await }
113 }), 256) }
115
116 async fn encode_single_frame(
118 &self,
119 raw_data: RawMediaData,
120 stream_id: String,
121 ) -> Result<MediaChunk, CodecError> {
122 let start_time = Instant::now();
123
124 let encoded_data = self.perform_encoding(&raw_data).await?;
126
127 let chunk_type = self.determine_chunk_type(&raw_data, &encoded_data);
129 let priority = chunk_type.default_priority();
130
131 let checksum = if self.config.preserve_metadata {
133 Some(self.calculate_checksum(&encoded_data))
134 } else {
135 None
136 };
137
138 {
140 let mut stats = self.stats.lock().await;
141 stats.frames_encoded += 1;
142 stats.bytes_processed += encoded_data.len() as u64;
143 stats.encoding_time_ms += start_time.elapsed().as_millis() as u64;
144
145 let compression_ratio = raw_data.data.len() as f64 / encoded_data.len() as f64;
146 stats.average_compression_ratio = (stats.average_compression_ratio
147 * (stats.frames_encoded - 1) as f64
148 + compression_ratio)
149 / stats.frames_encoded as f64;
150 }
151
152 Ok(MediaChunk {
153 stream_id,
154 sequence_number: 0, data: encoded_data,
156 chunk_type,
157 priority,
158 timestamp: raw_data.timestamp,
159 is_final: false,
160 checksum,
161 })
162 }
163
164 async fn perform_encoding(&self, raw_data: &RawMediaData) -> Result<Vec<u8>, CodecError> {
166 tokio::time::sleep(Duration::from_micros(100)).await;
168
169 match raw_data.media_type {
170 MediaType::Video => self.encode_video(&raw_data.data).await,
171 MediaType::Audio => self.encode_audio(&raw_data.data).await,
172 MediaType::Mixed => {
173 let (video_data, audio_data) = self.split_mixed_data(&raw_data.data)?;
175 let encoded_video = self.encode_video(&video_data).await?;
176 let encoded_audio = self.encode_audio(&audio_data).await?;
177 Ok(self.combine_encoded_data(encoded_video, encoded_audio))
178 }
179 }
180 }
181
182 async fn encode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
183 if data.is_empty() {
184 return Err(CodecError::InvalidData("Empty video data".to_string()));
185 }
186
187 let compression_factor = match self.config.quality {
189 QualityLevel::Low => 0.3,
190 QualityLevel::Medium => 0.5,
191 QualityLevel::High => 0.7,
192 QualityLevel::UltraHigh => 0.9,
193 };
194
195 let target_size = (data.len() as f64 * compression_factor) as usize;
196 let mut encoded = Vec::with_capacity(target_size);
197
198 for chunk in data.chunks(8) {
200 let compressed_chunk: Vec<u8> = chunk
201 .iter()
202 .step_by(if self.config.enable_compression { 2 } else { 1 })
203 .copied()
204 .collect();
205 encoded.extend(compressed_chunk);
206 }
207
208 Ok(encoded)
209 }
210
211 async fn encode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
212 if data.is_empty() {
213 return Err(CodecError::InvalidData("Empty audio data".to_string()));
214 }
215
216 let compression_factor = match self.config.quality {
218 QualityLevel::Low => 0.2,
219 QualityLevel::Medium => 0.4,
220 QualityLevel::High => 0.6,
221 QualityLevel::UltraHigh => 0.8,
222 };
223
224 let target_size = (data.len() as f64 * compression_factor) as usize;
225 let mut encoded = Vec::with_capacity(target_size);
226
227 for chunk in data.chunks(4) {
229 if let Some(&sample) = chunk.first() {
230 encoded.push(sample);
231 }
232 }
233
234 Ok(encoded)
235 }
236
237 fn split_mixed_data(&self, data: &[u8]) -> Result<(Vec<u8>, Vec<u8>), CodecError> {
238 if data.len() < 2 {
239 return Err(CodecError::InvalidData("Mixed data too short".to_string()));
240 }
241
242 let split_point = (data.len() as f64 * 0.6) as usize;
244 let video_data = data[..split_point].to_vec();
245 let audio_data = data[split_point..].to_vec();
246
247 Ok((video_data, audio_data))
248 }
249
250 fn combine_encoded_data(&self, video: Vec<u8>, audio: Vec<u8>) -> Vec<u8> {
251 let mut combined = Vec::with_capacity(video.len() + audio.len() + 8);
252
253 combined.extend_from_slice(&(video.len() as u32).to_le_bytes());
255 combined.extend_from_slice(&(audio.len() as u32).to_le_bytes());
256 combined.extend(video);
257 combined.extend(audio);
258
259 combined
260 }
261
262 fn determine_chunk_type(&self, raw_data: &RawMediaData, _encoded_data: &[u8]) -> ChunkType {
263 match raw_data.media_type {
264 MediaType::Audio => ChunkType::Audio,
265 MediaType::Video => {
266 if raw_data.timestamp.as_millis() % (self.config.keyframe_interval as u128 * 33)
268 == 0
269 {
270 ChunkType::VideoIFrame
271 } else {
272 ChunkType::VideoPFrame
273 }
274 }
275 MediaType::Mixed => ChunkType::VideoIFrame, }
277 }
278
279 fn calculate_checksum(&self, data: &[u8]) -> String {
280 let mut hasher = Sha256::new();
281 hasher.update(data);
282 format!("{:x}", hasher.finalize())
283 }
284
285 pub async fn decode_chunk(&self, chunk: MediaChunk) -> Result<RawMediaData, CodecError> {
287 let decoded_data = match chunk.chunk_type {
288 ChunkType::Audio => self.decode_audio(&chunk.data).await?,
289 ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
290 self.decode_video(&chunk.data).await?
291 }
292 ChunkType::Metadata => chunk.data.clone(), ChunkType::Thumbnail => self.decode_video(&chunk.data).await?, };
295
296 if let Some(expected_checksum) = &chunk.checksum {
298 let actual_checksum = self.calculate_checksum(&chunk.data);
299 if actual_checksum != *expected_checksum {
300 return Err(CodecError::InvalidData("Checksum mismatch".to_string()));
301 }
302 }
303
304 let mut stats = self.stats.lock().await;
305 stats.frames_decoded += 1;
306
307 Ok(RawMediaData {
308 data: decoded_data,
309 media_type: match chunk.chunk_type {
310 ChunkType::Audio => MediaType::Audio,
311 _ => MediaType::Video,
312 },
313 timestamp: chunk.timestamp,
314 metadata: HashMap::new(),
315 })
316 }
317
318 async fn decode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
319 let mut decoded = Vec::with_capacity(data.len() * 2);
321
322 for &byte in data {
323 decoded.push(byte);
324 if self.config.enable_compression {
325 decoded.push(byte); }
327 }
328
329 Ok(decoded)
330 }
331
332 async fn decode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
333 let mut decoded = Vec::with_capacity(data.len() * 4);
335
336 for &sample in data {
337 decoded.extend_from_slice(&[sample, sample, sample, sample]);
339 }
340
341 Ok(decoded)
342 }
343
344 pub async fn get_stats(&self) -> CodecStats {
346 let stats = self.stats.lock().await;
347 stats.clone()
348 }
349
350 pub async fn reset_stats(&self) {
352 let mut stats = self.stats.lock().await;
353 *stats = CodecStats::default();
354 }
355}
356
357pub struct CodecFactory;
359
360impl CodecFactory {
361 pub fn create_h264_codec(quality: QualityLevel) -> MediaCodec {
362 let config = EncodingConfig {
363 quality,
364 target_bitrate: match quality {
365 QualityLevel::Low => 500_000, QualityLevel::Medium => 1_500_000, QualityLevel::High => 5_000_000, QualityLevel::UltraHigh => 15_000_000, },
370 keyframe_interval: 30,
371 enable_compression: true,
372 preserve_metadata: true,
373 };
374
375 MediaCodec::new(config)
376 }
377
378 pub fn create_audio_codec(quality: QualityLevel) -> MediaCodec {
379 let config = EncodingConfig {
380 quality,
381 target_bitrate: match quality {
382 QualityLevel::Low => 64_000, QualityLevel::Medium => 128_000, QualityLevel::High => 320_000, QualityLevel::UltraHigh => 500_000, },
387 keyframe_interval: 0, enable_compression: true,
389 preserve_metadata: false, };
391
392 MediaCodec::new(config)
393 }
394
395 pub fn create_adaptive_codec() -> MediaCodec {
396 let config = EncodingConfig {
398 quality: QualityLevel::Medium, target_bitrate: 1_000_000,
400 keyframe_interval: 60, enable_compression: true,
402 preserve_metadata: true,
403 };
404
405 MediaCodec::new(config)
406 }
407}