rs2_stream/media/
codec.rs

1//! Codec implementations for media processing
2//!
3//! Provides encoding/decoding capabilities with support for different formats
4//! and quality levels. Uses async processing to integrate with RS2Stream.
5
6use super::types::*;
7use crate::*;
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::time::Duration;
11use tokio::time::Instant;
12
13/// Errors that can occur during codec operations
14#[derive(Debug, Clone)]
15pub enum CodecError {
16    UnsupportedFormat(String),
17    EncodingFailed(String),
18    DecodingFailed(String),
19    InvalidData(String),
20    ResourceExhausted,
21}
22
23impl std::fmt::Display for CodecError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            CodecError::UnsupportedFormat(format) => write!(f, "Unsupported format: {}", format),
27            CodecError::EncodingFailed(reason) => write!(f, "Encoding failed: {}", reason),
28            CodecError::DecodingFailed(reason) => write!(f, "Decoding failed: {}", reason),
29            CodecError::InvalidData(reason) => write!(f, "Invalid data: {}", reason),
30            CodecError::ResourceExhausted => write!(f, "Resource exhausted"),
31        }
32    }
33}
34
35impl std::error::Error for CodecError {}
36
37/// Configuration for encoding operations
38#[derive(Debug, Clone)]
39pub struct EncodingConfig {
40    pub quality: QualityLevel,
41    pub target_bitrate: u32,
42    pub keyframe_interval: u32,
43    pub enable_compression: bool,
44    pub preserve_metadata: bool,
45}
46
47impl Default for EncodingConfig {
48    fn default() -> Self {
49        Self {
50            quality: QualityLevel::Medium,
51            target_bitrate: 1_000_000, // 1 Mbps
52            keyframe_interval: 30,     // Every 30 frames
53            enable_compression: true,
54            preserve_metadata: true,
55        }
56    }
57}
58
59/// Raw media data before processing
60#[derive(Debug, Clone)]
61pub struct RawMediaData {
62    pub data: Vec<u8>,
63    pub media_type: MediaType,
64    pub timestamp: Duration,
65    pub metadata: HashMap<String, String>,
66}
67
68/// Codec statistics for monitoring
69#[derive(Debug, Clone, Default)]
70pub struct CodecStats {
71    pub frames_encoded: u64,
72    pub frames_decoded: u64,
73    pub bytes_processed: u64,
74    pub encoding_time_ms: u64,
75    pub average_compression_ratio: f64,
76    pub error_count: u64,
77}
78
79/// Main codec implementation
80pub struct MediaCodec {
81    config: EncodingConfig,
82    stats: tokio::sync::Mutex<CodecStats>,
83}
84
85impl Clone for MediaCodec {
86    fn clone(&self) -> Self {
87        Self {
88            config: self.config.clone(),
89            stats: tokio::sync::Mutex::new(CodecStats::default()),
90        }
91    }
92}
93
94impl MediaCodec {
95    pub fn new(config: EncodingConfig) -> Self {
96        Self {
97            config,
98            stats: tokio::sync::Mutex::new(CodecStats::default()),
99        }
100    }
101
102    /// Encode raw media data into chunks
103    pub fn encode_stream(
104        &self,
105        raw_data_stream: RS2Stream<RawMediaData>,
106        stream_id: String,
107    ) -> RS2Stream<Result<MediaChunk, CodecError>> {
108        let self_clone = self.clone();
109        auto_backpressure_block(par_eval_map(raw_data_stream, 4, move |raw_data| {
110            let stream_id = stream_id.clone();
111            let codec = self_clone.clone();
112            async move { codec.encode_single_frame(raw_data, stream_id).await }
113        }), 256) // Prevent encoder from overwhelming system
114    }
115
116    /// Encode a single frame/audio sample
117    async fn encode_single_frame(
118        &self,
119        raw_data: RawMediaData,
120        stream_id: String,
121    ) -> Result<MediaChunk, CodecError> {
122        let start_time = Instant::now();
123
124        // Simulate encoding process
125        let encoded_data = self.perform_encoding(&raw_data).await?;
126
127        // Determine chunk type based on content
128        let chunk_type = self.determine_chunk_type(&raw_data, &encoded_data);
129        let priority = chunk_type.default_priority();
130
131        // Generate checksum for integrity
132        let checksum = if self.config.preserve_metadata {
133            Some(self.calculate_checksum(&encoded_data))
134        } else {
135            None
136        };
137
138        // Update statistics
139        {
140            let mut stats = self.stats.lock().await;
141            stats.frames_encoded += 1;
142            stats.bytes_processed += encoded_data.len() as u64;
143            stats.encoding_time_ms += start_time.elapsed().as_millis() as u64;
144
145            let compression_ratio = raw_data.data.len() as f64 / encoded_data.len() as f64;
146            stats.average_compression_ratio = (stats.average_compression_ratio
147                * (stats.frames_encoded - 1) as f64
148                + compression_ratio)
149                / stats.frames_encoded as f64;
150        }
151
152        Ok(MediaChunk {
153            stream_id,
154            sequence_number: 0, // Will be set by chunk processor
155            data: encoded_data,
156            chunk_type,
157            priority,
158            timestamp: raw_data.timestamp,
159            is_final: false,
160            checksum,
161        })
162    }
163
164    /// Perform the actual encoding (simplified mock implementation)
165    async fn perform_encoding(&self, raw_data: &RawMediaData) -> Result<Vec<u8>, CodecError> {
166        // Simulate encoding work
167        tokio::time::sleep(Duration::from_micros(100)).await;
168
169        match raw_data.media_type {
170            MediaType::Video => self.encode_video(&raw_data.data).await,
171            MediaType::Audio => self.encode_audio(&raw_data.data).await,
172            MediaType::Mixed => {
173                // Split and encode both components
174                let (video_data, audio_data) = self.split_mixed_data(&raw_data.data)?;
175                let encoded_video = self.encode_video(&video_data).await?;
176                let encoded_audio = self.encode_audio(&audio_data).await?;
177                Ok(self.combine_encoded_data(encoded_video, encoded_audio))
178            }
179        }
180    }
181
182    async fn encode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
183        if data.is_empty() {
184            return Err(CodecError::InvalidData("Empty video data".to_string()));
185        }
186
187        // Mock video encoding with compression based on quality
188        let compression_factor = match self.config.quality {
189            QualityLevel::Low => 0.3,
190            QualityLevel::Medium => 0.5,
191            QualityLevel::High => 0.7,
192            QualityLevel::UltraHigh => 0.9,
193        };
194
195        let target_size = (data.len() as f64 * compression_factor) as usize;
196        let mut encoded = Vec::with_capacity(target_size);
197
198        // Simulate compression (in reality, this would use actual video codecs)
199        for chunk in data.chunks(8) {
200            let compressed_chunk: Vec<u8> = chunk
201                .iter()
202                .step_by(if self.config.enable_compression { 2 } else { 1 })
203                .copied()
204                .collect();
205            encoded.extend(compressed_chunk);
206        }
207
208        Ok(encoded)
209    }
210
211    async fn encode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
212        if data.is_empty() {
213            return Err(CodecError::InvalidData("Empty audio data".to_string()));
214        }
215
216        // Mock audio encoding - audio typically compresses better than video
217        let compression_factor = match self.config.quality {
218            QualityLevel::Low => 0.2,
219            QualityLevel::Medium => 0.4,
220            QualityLevel::High => 0.6,
221            QualityLevel::UltraHigh => 0.8,
222        };
223
224        let target_size = (data.len() as f64 * compression_factor) as usize;
225        let mut encoded = Vec::with_capacity(target_size);
226
227        // Simulate audio compression
228        for chunk in data.chunks(4) {
229            if let Some(&sample) = chunk.first() {
230                encoded.push(sample);
231            }
232        }
233
234        Ok(encoded)
235    }
236
237    fn split_mixed_data(&self, data: &[u8]) -> Result<(Vec<u8>, Vec<u8>), CodecError> {
238        if data.len() < 2 {
239            return Err(CodecError::InvalidData("Mixed data too short".to_string()));
240        }
241
242        // Simple split: assume first 60% is video, rest is audio
243        let split_point = (data.len() as f64 * 0.6) as usize;
244        let video_data = data[..split_point].to_vec();
245        let audio_data = data[split_point..].to_vec();
246
247        Ok((video_data, audio_data))
248    }
249
250    fn combine_encoded_data(&self, video: Vec<u8>, audio: Vec<u8>) -> Vec<u8> {
251        let mut combined = Vec::with_capacity(video.len() + audio.len() + 8);
252
253        // Add a simple header indicating sizes
254        combined.extend_from_slice(&(video.len() as u32).to_le_bytes());
255        combined.extend_from_slice(&(audio.len() as u32).to_le_bytes());
256        combined.extend(video);
257        combined.extend(audio);
258
259        combined
260    }
261
262    fn determine_chunk_type(&self, raw_data: &RawMediaData, _encoded_data: &[u8]) -> ChunkType {
263        match raw_data.media_type {
264            MediaType::Audio => ChunkType::Audio,
265            MediaType::Video => {
266                // Simulate keyframe detection (every N frames)
267                if raw_data.timestamp.as_millis() % (self.config.keyframe_interval as u128 * 33)
268                    == 0
269                {
270                    ChunkType::VideoIFrame
271                } else {
272                    ChunkType::VideoPFrame
273                }
274            }
275            MediaType::Mixed => ChunkType::VideoIFrame, // Treat mixed as high priority
276        }
277    }
278
279    fn calculate_checksum(&self, data: &[u8]) -> String {
280        let mut hasher = Sha256::new();
281        hasher.update(data);
282        format!("{:x}", hasher.finalize())
283    }
284
285    /// Decode chunks back to raw data (for testing/validation)
286    pub async fn decode_chunk(&self, chunk: MediaChunk) -> Result<RawMediaData, CodecError> {
287        let decoded_data = match chunk.chunk_type {
288            ChunkType::Audio => self.decode_audio(&chunk.data).await?,
289            ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
290                self.decode_video(&chunk.data).await?
291            }
292            ChunkType::Metadata => chunk.data.clone(), // Metadata passed through
293            ChunkType::Thumbnail => self.decode_video(&chunk.data).await?, // Treat as video
294        };
295
296        // Verify checksum if present
297        if let Some(expected_checksum) = &chunk.checksum {
298            let actual_checksum = self.calculate_checksum(&chunk.data);
299            if actual_checksum != *expected_checksum {
300                return Err(CodecError::InvalidData("Checksum mismatch".to_string()));
301            }
302        }
303
304        let mut stats = self.stats.lock().await;
305        stats.frames_decoded += 1;
306
307        Ok(RawMediaData {
308            data: decoded_data,
309            media_type: match chunk.chunk_type {
310                ChunkType::Audio => MediaType::Audio,
311                _ => MediaType::Video,
312            },
313            timestamp: chunk.timestamp,
314            metadata: HashMap::new(),
315        })
316    }
317
318    async fn decode_video(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
319        // Mock video decoding - expand compressed data
320        let mut decoded = Vec::with_capacity(data.len() * 2);
321
322        for &byte in data {
323            decoded.push(byte);
324            if self.config.enable_compression {
325                decoded.push(byte); // Duplicate to simulate decompression
326            }
327        }
328
329        Ok(decoded)
330    }
331
332    async fn decode_audio(&self, data: &[u8]) -> Result<Vec<u8>, CodecError> {
333        // Mock audio decoding
334        let mut decoded = Vec::with_capacity(data.len() * 4);
335
336        for &sample in data {
337            // Expand each sample
338            decoded.extend_from_slice(&[sample, sample, sample, sample]);
339        }
340
341        Ok(decoded)
342    }
343
344    /// Get current codec statistics
345    pub async fn get_stats(&self) -> CodecStats {
346        let stats = self.stats.lock().await;
347        stats.clone()
348    }
349
350    /// Reset statistics
351    pub async fn reset_stats(&self) {
352        let mut stats = self.stats.lock().await;
353        *stats = CodecStats::default();
354    }
355}
356
357/// Factory for creating codecs with different configurations
358pub struct CodecFactory;
359
360impl CodecFactory {
361    pub fn create_h264_codec(quality: QualityLevel) -> MediaCodec {
362        let config = EncodingConfig {
363            quality,
364            target_bitrate: match quality {
365                QualityLevel::Low => 500_000,          // 500 Kbps
366                QualityLevel::Medium => 1_500_000,     // 1.5 Mbps
367                QualityLevel::High => 5_000_000,       // 5 Mbps
368                QualityLevel::UltraHigh => 15_000_000, // 15 Mbps
369            },
370            keyframe_interval: 30,
371            enable_compression: true,
372            preserve_metadata: true,
373        };
374
375        MediaCodec::new(config)
376    }
377
378    pub fn create_audio_codec(quality: QualityLevel) -> MediaCodec {
379        let config = EncodingConfig {
380            quality,
381            target_bitrate: match quality {
382                QualityLevel::Low => 64_000,        // 64 Kbps
383                QualityLevel::Medium => 128_000,    // 128 Kbps
384                QualityLevel::High => 320_000,      // 320 Kbps
385                QualityLevel::UltraHigh => 500_000, // 500 Kbps
386            },
387            keyframe_interval: 0, // N/A for audio
388            enable_compression: true,
389            preserve_metadata: false, // Audio usually doesn't need checksums
390        };
391
392        MediaCodec::new(config)
393    }
394
395    pub fn create_adaptive_codec() -> MediaCodec {
396        // Codec that can adjust quality based on network conditions
397        let config = EncodingConfig {
398            quality: QualityLevel::Medium, // Start with medium
399            target_bitrate: 1_000_000,
400            keyframe_interval: 60, // Longer interval for adaptive streaming
401            enable_compression: true,
402            preserve_metadata: true,
403        };
404
405        MediaCodec::new(config)
406    }
407}