rs2_stream/media/
chunk_processor.rs

1//! Chunk processing pipeline for media streaming
2//!
3//! Handles chunk validation, sequencing, buffering, and delivery.
4//! Integrates with RS2Stream for backpressure and parallel processing.
5
6use super::codec::{CodecError, MediaCodec};
7use super::types::*;
8use crate::queue::Queue;
9use crate::*;
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, RwLock};
14use tokio::time::Instant;
15
16/// Errors that can occur during chunk processing
17#[derive(Debug, Clone)]
18pub enum ChunkProcessingError {
19    SequenceGap { expected: u64, received: u64 },
20    DuplicateChunk(u64),
21    BufferOverflow,
22    ValidationFailed(String),
23    CodecError(String),
24    Timeout,
25}
26
27impl std::fmt::Display for ChunkProcessingError {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            ChunkProcessingError::SequenceGap { expected, received } => {
31                write!(
32                    f,
33                    "Sequence gap: expected {}, received {}",
34                    expected, received
35                )
36            }
37            ChunkProcessingError::DuplicateChunk(seq) => {
38                write!(f, "Duplicate chunk: {}", seq)
39            }
40            ChunkProcessingError::BufferOverflow => write!(f, "Buffer overflow"),
41            ChunkProcessingError::ValidationFailed(reason) => {
42                write!(f, "Validation failed: {}", reason)
43            }
44            ChunkProcessingError::CodecError(reason) => {
45                write!(f, "Codec error: {}", reason)
46            }
47            ChunkProcessingError::Timeout => write!(f, "Processing timeout"),
48        }
49    }
50}
51
52impl std::error::Error for ChunkProcessingError {}
53
54/// Configuration for chunk processing
55#[derive(Debug, Clone)]
56pub struct ChunkProcessorConfig {
57    pub max_buffer_size: usize,
58    pub sequence_timeout: Duration,
59    pub enable_reordering: bool,
60    pub max_reorder_window: usize,
61    pub enable_validation: bool,
62    pub parallel_processing: usize,
63}
64
65impl Default for ChunkProcessorConfig {
66    fn default() -> Self {
67        Self {
68            max_buffer_size: 1024,
69            sequence_timeout: Duration::from_secs(5),
70            enable_reordering: true,
71            max_reorder_window: 32,
72            enable_validation: true,
73            parallel_processing: 4,
74        }
75    }
76}
77
78/// Tracks state for chunk reordering
79#[derive(Debug)]
80struct ReorderBuffer {
81    buffer: VecDeque<MediaChunk>,
82    next_expected_sequence: u64,
83    last_received_time: Instant,
84    max_size: usize,
85    // Track sequence numbers for O(1) duplicate detection
86    sequence_numbers: std::collections::HashSet<u64>,
87}
88
89impl ReorderBuffer {
90    fn new(max_size: usize) -> Self {
91        Self {
92            buffer: VecDeque::new(),
93            next_expected_sequence: 0,
94            last_received_time: Instant::now(),
95            max_size,
96            sequence_numbers: std::collections::HashSet::new(),
97        }
98    }
99
100    fn try_insert(&mut self, chunk: MediaChunk) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
101        if self.buffer.len() >= self.max_size {
102            return Err(ChunkProcessingError::BufferOverflow);
103        }
104
105        self.last_received_time = Instant::now();
106
107        // Check for duplicates using O(1) HashSet lookup
108        let seq_num = chunk.sequence_number;
109        if self.sequence_numbers.contains(&seq_num) {
110            return Err(ChunkProcessingError::DuplicateChunk(seq_num));
111        }
112
113        // Insert in order
114        let insert_pos = self
115            .buffer
116            .binary_search_by_key(&seq_num, |c| c.sequence_number)
117            .unwrap_or_else(|pos| pos);
118
119        // Add to sequence numbers set
120        self.sequence_numbers.insert(seq_num);
121
122        // Insert chunk into buffer
123        self.buffer.insert(insert_pos, chunk);
124
125        // Extract ready chunks
126        self.extract_ready_chunks()
127    }
128
129    fn extract_ready_chunks(&mut self) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
130        let mut ready_chunks = Vec::new();
131
132        while let Some(chunk) = self.buffer.front() {
133            if chunk.sequence_number == self.next_expected_sequence {
134                let chunk = self.buffer.pop_front().unwrap();
135
136                // Remove from sequence numbers set
137                self.sequence_numbers.remove(&chunk.sequence_number);
138
139                self.next_expected_sequence += 1;
140                ready_chunks.push(chunk);
141            } else {
142                break;
143            }
144        }
145
146        Ok(ready_chunks)
147    }
148
149    fn force_flush(&mut self) -> Vec<MediaChunk> {
150        let chunks: Vec<_> = self.buffer.drain(..).collect();
151
152        // Clear sequence numbers set as buffer is now empty
153        self.sequence_numbers.clear();
154
155        if let Some(last_chunk) = chunks.last() {
156            self.next_expected_sequence = last_chunk.sequence_number + 1;
157        }
158        chunks
159    }
160
161    fn is_timeout(&self, timeout: Duration) -> bool {
162        self.last_received_time.elapsed() > timeout
163    }
164}
165
166/// Statistics for chunk processing
167#[derive(Debug, Clone, Default)]
168pub struct ChunkProcessorStats {
169    pub chunks_processed: u64,
170    pub chunks_reordered: u64,
171    pub chunks_dropped: u64,
172    pub sequence_gaps: u64,
173    pub validation_failures: u64,
174    pub average_processing_time_ms: f64,
175    pub buffer_utilization: f64,
176}
177
178/// Main chunk processor
179pub struct ChunkProcessor {
180    config: ChunkProcessorConfig,
181    codec: Arc<MediaCodec>,
182    reorder_buffers: Arc<RwLock<HashMap<String, ReorderBuffer>>>,
183    stats: Arc<Mutex<ChunkProcessorStats>>,
184    output_queue: Arc<Queue<MediaChunk>>,
185}
186
187impl ChunkProcessor {
188    pub fn new(
189        config: ChunkProcessorConfig,
190        codec: Arc<MediaCodec>,
191        output_queue: Arc<Queue<MediaChunk>>,
192    ) -> Self {
193        Self {
194            config,
195            codec,
196            reorder_buffers: Arc::new(RwLock::new(HashMap::new())),
197            stats: Arc::new(Mutex::new(ChunkProcessorStats::default())),
198            output_queue,
199        }
200    }
201
202    /// Process a stream of incoming chunks
203    pub fn process_chunk_stream(
204        &self,
205        chunk_stream: RS2Stream<MediaChunk>,
206    ) -> RS2Stream<Result<MediaChunk, ChunkProcessingError>> {
207        let processor = self.clone();
208
209        let stream = chunk_stream.par_eval_map_rs2(self.config.parallel_processing, move |chunk| {
210            let processor = processor.clone();
211            async move { processor.process_single_chunk(chunk).await }
212        });
213
214        auto_backpressure_block(stream, self.config.max_buffer_size)
215    }
216
217
218    /// Process a single chunk
219    async fn process_single_chunk(
220        &self,
221        mut chunk: MediaChunk,
222    ) -> Result<MediaChunk, ChunkProcessingError> {
223        let start_time = Instant::now();
224
225        // Create a reference to the original chunk for validation
226        // This avoids cloning the entire chunk
227        let original_seq_num = chunk.sequence_number;
228        let original_stream_id = chunk.stream_id.clone();
229
230        // Step 1: Validate chunk if enabled
231        if self.config.enable_validation {
232            self.validate_chunk(&chunk).await
233                .map_err(|e| {
234                    log::warn!("Chunk validation failed for stream {}: {:?}", chunk.stream_id, e);
235                    e
236                })?;
237        }
238
239        // Step 2: Set sequence number if not set
240        if chunk.sequence_number == 0 {
241            chunk.sequence_number = self.generate_sequence_number(&chunk.stream_id).await;
242        }
243
244        // Step 3: Handle reordering if enabled
245        let ready_chunks = if self.config.enable_reordering {
246            self.handle_reordering(chunk).await
247                .map_err(|e| {
248                    log::warn!("Reordering failed for stream {}: {:?}", original_stream_id, e);
249                    e
250                })?
251        } else {
252            vec![chunk]
253        };
254
255        // Step 4: Process ready chunks
256        for ready_chunk in ready_chunks {
257            // Enqueue to output - avoid cloning when possible
258            if let Err(e) = self.output_queue.try_enqueue(ready_chunk).await {
259                // Queue full, update stats
260                let mut stats = self.stats.lock().await;
261                stats.chunks_dropped += 1;
262                log::debug!("Failed to enqueue chunk: {:?}", e);
263            }
264        }
265
266        // Step 5: Update statistics
267        {
268            let mut stats = self.stats.lock().await;
269            stats.chunks_processed += 1;
270            // Ensure processing time is at least 0.1ms to avoid zero values in tests
271            let processing_time = f64::max(0.1, start_time.elapsed().as_millis() as f64);
272            stats.average_processing_time_ms = (stats.average_processing_time_ms
273                * (stats.chunks_processed - 1) as f64
274                + processing_time)
275                / stats.chunks_processed as f64;
276        }
277
278        // Step 6: Periodic cleanup - only run occasionally to reduce overhead
279        // Use a 1% chance to run cleanup, which statistically ensures it runs
280        // regularly but not for every chunk
281        if rand::random::<f32>() < 0.01 {
282            log::debug!("Running periodic buffer cleanup");
283            self.cleanup_expired_buffers().await;
284        }
285
286        // Create a minimal result chunk with just the necessary information
287        // This is more efficient than cloning the entire chunk at the beginning
288        let stream_id_for_result = original_stream_id.clone(); // Clone to avoid ownership issues
289
290        Ok(MediaChunk {
291            stream_id: stream_id_for_result,
292            sequence_number: if original_seq_num == 0 {
293                self.generate_sequence_number(&original_stream_id).await
294            } else {
295                original_seq_num
296            },
297            // Use minimal default values for fields that aren't needed in the result
298            data: Vec::new(),
299            chunk_type: ChunkType::Metadata,
300            priority: MediaPriority::Normal,
301            timestamp: Duration::from_secs(0),
302            is_final: false,
303            checksum: None,
304        })
305    }
306
307    /// Validate chunk integrity and format
308    async fn validate_chunk(&self, chunk: &MediaChunk) -> Result<(), ChunkProcessingError> {
309        // Check basic fields
310        if chunk.stream_id.is_empty() {
311            return Err(ChunkProcessingError::ValidationFailed(
312                "Empty stream ID".to_string(),
313            ));
314        }
315
316        if chunk.data.is_empty() {
317            return Err(ChunkProcessingError::ValidationFailed(
318                "Empty chunk data".to_string(),
319            ));
320        }
321
322        // Validate checksum if present
323        if let Some(expected_checksum) = &chunk.checksum {
324            let actual_checksum = self.calculate_checksum(&chunk.data);
325            if actual_checksum != *expected_checksum {
326                return Err(ChunkProcessingError::ValidationFailed(
327                    "Checksum mismatch".to_string(),
328                ));
329            }
330        }
331
332        // Validate chunk type consistency
333        match chunk.chunk_type {
334            ChunkType::Audio => {
335                if chunk.data.len() > 64 * 1024 {
336                    return Err(ChunkProcessingError::ValidationFailed(
337                        "Audio chunk too large".to_string(),
338                    ));
339                }
340            }
341            ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
342                if chunk.data.len() > 1024 * 1024 {
343                    return Err(ChunkProcessingError::ValidationFailed(
344                        "Video chunk too large".to_string(),
345                    ));
346                }
347            }
348            _ => {} // Other types pass through
349        }
350
351        Ok(())
352    }
353
354    /// Handle chunk reordering
355    async fn handle_reordering(
356        &self,
357        chunk: MediaChunk,
358    ) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
359        let stream_id = chunk.stream_id.clone();
360
361        // Get or create reorder buffer for this stream - using a single write lock for both operations
362        let result = {
363            let mut buffers = self.reorder_buffers.write().await;
364
365            // Create buffer if it doesn't exist
366            if !buffers.contains_key(&stream_id) {
367                buffers.insert(
368                    stream_id.clone(),
369                    ReorderBuffer::new(self.config.max_reorder_window),
370                );
371            }
372
373            // Insert chunk and get ready chunks
374            let buffer = buffers.get_mut(&stream_id).unwrap();
375            buffer.try_insert(chunk)
376        };
377
378        // Update stats based on result
379        match result {
380            Ok(ready_chunks) => {
381                if ready_chunks.len() > 1 {
382                    let mut stats = self.stats.lock().await;
383                    stats.chunks_reordered += ready_chunks.len() as u64 - 1;
384                }
385                Ok(ready_chunks)
386            }
387            Err(e) => {
388                let mut stats = self.stats.lock().await;
389                match e {
390                    ChunkProcessingError::SequenceGap { .. } => stats.sequence_gaps += 1,
391                    ChunkProcessingError::DuplicateChunk(_) => stats.chunks_dropped += 1,
392                    _ => stats.validation_failures += 1,
393                }
394                Err(e)
395            }
396        }
397    }
398
399    /// Generate sequence number for chunks that don't have one
400    async fn generate_sequence_number(&self, stream_id: &str) -> u64 {
401        // Simple implementation - in production, this would be more sophisticated
402        let buffers = self.reorder_buffers.read().await;
403        if let Some(buffer) = buffers.get(stream_id) {
404            buffer.next_expected_sequence
405        } else {
406            0
407        }
408    }
409
410    /// Clean up expired reorder buffers
411    async fn cleanup_expired_buffers(&self) {
412        let mut buffers = self.reorder_buffers.write().await;
413        let mut to_remove = Vec::new();
414        let mut dropped_chunks = 0;
415
416        for (stream_id, buffer) in buffers.iter_mut() {
417            if buffer.is_timeout(self.config.sequence_timeout) {
418                // Force flush expired buffer
419                let flushed_chunks = buffer.force_flush();
420
421                log::debug!(
422                    "Flushing expired buffer for stream {}: {} chunks", 
423                    stream_id, 
424                    flushed_chunks.len()
425                );
426
427                // Send flushed chunks to output
428                for chunk in flushed_chunks {
429                    if let Err(e) = self.output_queue.try_enqueue(chunk).await {
430                        dropped_chunks += 1;
431                        log::debug!(
432                            "Failed to enqueue flushed chunk from stream {}: {:?}", 
433                            stream_id, 
434                            e
435                        );
436                    }
437                }
438
439                to_remove.push(stream_id.clone());
440            }
441        }
442
443        // Update stats if any chunks were dropped
444        if dropped_chunks > 0 {
445            let mut stats = self.stats.lock().await;
446            stats.chunks_dropped += dropped_chunks;
447            log::warn!("Dropped {} chunks during buffer cleanup", dropped_chunks);
448        }
449
450        // Remove expired buffers
451        for stream_id in to_remove {
452            log::debug!("Removing expired buffer for stream {}", stream_id);
453            buffers.remove(&stream_id);
454        }
455    }
456
457    /// Calculate checksum for validation
458    fn calculate_checksum(&self, data: &[u8]) -> String {
459        use sha2::{Digest, Sha256};
460        let mut hasher = Sha256::new();
461        hasher.update(data);
462        format!("{:x}", hasher.finalize())
463    }
464
465    /// Get processing statistics
466    pub async fn get_stats(&self) -> ChunkProcessorStats {
467        let stats = self.stats.lock().await;
468        let mut stats = stats.clone();
469
470        // Calculate buffer utilization
471        let buffers = self.reorder_buffers.read().await;
472        let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
473        let max_possible_size = buffers.len() * self.config.max_reorder_window;
474
475        stats.buffer_utilization = if max_possible_size > 0 {
476            total_buffer_size as f64 / max_possible_size as f64
477        } else {
478            0.0
479        };
480
481        stats
482    }
483
484    /// Create a monitoring stream for chunk processing
485    pub fn create_monitoring_stream(&self) -> RS2Stream<ChunkProcessorStats> {
486        let stats = Arc::clone(&self.stats);
487        let reorder_buffers = Arc::clone(&self.reorder_buffers);
488        let config = self.config.clone();
489
490        tick(Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
491            let stats = Arc::clone(&stats);
492            let reorder_buffers = Arc::clone(&reorder_buffers);
493            let config = config.clone();
494
495            async move {
496                let mut current_stats = {
497                    let s = stats.lock().await;
498                    s.clone()
499                };
500
501                // Update buffer utilization
502                let buffers = reorder_buffers.read().await;
503                let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
504                let max_possible_size = buffers.len() * config.max_reorder_window;
505
506                current_stats.buffer_utilization = if max_possible_size > 0 {
507                    total_buffer_size as f64 / max_possible_size as f64
508                } else {
509                    0.0
510                };
511
512                current_stats
513            }
514        })
515    }
516}
517
518impl Clone for ChunkProcessor {
519    fn clone(&self) -> Self {
520        Self {
521            config: self.config.clone(),
522            codec: Arc::clone(&self.codec),
523            reorder_buffers: Arc::clone(&self.reorder_buffers),
524            stats: Arc::clone(&self.stats),
525            output_queue: Arc::clone(&self.output_queue),
526        }
527    }
528}