1use super::codec::{CodecError, MediaCodec};
7use super::types::*;
8use crate::queue::Queue;
9use crate::*;
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, RwLock};
14use tokio::time::Instant;
15
16#[derive(Debug, Clone)]
18pub enum ChunkProcessingError {
19 SequenceGap { expected: u64, received: u64 },
20 DuplicateChunk(u64),
21 BufferOverflow,
22 ValidationFailed(String),
23 CodecError(String),
24 Timeout,
25}
26
27impl std::fmt::Display for ChunkProcessingError {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 ChunkProcessingError::SequenceGap { expected, received } => {
31 write!(
32 f,
33 "Sequence gap: expected {}, received {}",
34 expected, received
35 )
36 }
37 ChunkProcessingError::DuplicateChunk(seq) => {
38 write!(f, "Duplicate chunk: {}", seq)
39 }
40 ChunkProcessingError::BufferOverflow => write!(f, "Buffer overflow"),
41 ChunkProcessingError::ValidationFailed(reason) => {
42 write!(f, "Validation failed: {}", reason)
43 }
44 ChunkProcessingError::CodecError(reason) => {
45 write!(f, "Codec error: {}", reason)
46 }
47 ChunkProcessingError::Timeout => write!(f, "Processing timeout"),
48 }
49 }
50}
51
52impl std::error::Error for ChunkProcessingError {}
53
54#[derive(Debug, Clone)]
56pub struct ChunkProcessorConfig {
57 pub max_buffer_size: usize,
58 pub sequence_timeout: Duration,
59 pub enable_reordering: bool,
60 pub max_reorder_window: usize,
61 pub enable_validation: bool,
62 pub parallel_processing: usize,
63}
64
65impl Default for ChunkProcessorConfig {
66 fn default() -> Self {
67 Self {
68 max_buffer_size: 1024,
69 sequence_timeout: Duration::from_secs(5),
70 enable_reordering: true,
71 max_reorder_window: 32,
72 enable_validation: true,
73 parallel_processing: 4,
74 }
75 }
76}
77
78#[derive(Debug)]
80struct ReorderBuffer {
81 buffer: VecDeque<MediaChunk>,
82 next_expected_sequence: u64,
83 last_received_time: Instant,
84 max_size: usize,
85 sequence_numbers: std::collections::HashSet<u64>,
87}
88
89impl ReorderBuffer {
90 fn new(max_size: usize) -> Self {
91 Self {
92 buffer: VecDeque::new(),
93 next_expected_sequence: 0,
94 last_received_time: Instant::now(),
95 max_size,
96 sequence_numbers: std::collections::HashSet::new(),
97 }
98 }
99
100 fn try_insert(&mut self, chunk: MediaChunk) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
101 if self.buffer.len() >= self.max_size {
102 return Err(ChunkProcessingError::BufferOverflow);
103 }
104
105 self.last_received_time = Instant::now();
106
107 let seq_num = chunk.sequence_number;
109 if self.sequence_numbers.contains(&seq_num) {
110 return Err(ChunkProcessingError::DuplicateChunk(seq_num));
111 }
112
113 let insert_pos = self
115 .buffer
116 .binary_search_by_key(&seq_num, |c| c.sequence_number)
117 .unwrap_or_else(|pos| pos);
118
119 self.sequence_numbers.insert(seq_num);
121
122 self.buffer.insert(insert_pos, chunk);
124
125 self.extract_ready_chunks()
127 }
128
129 fn extract_ready_chunks(&mut self) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
130 let mut ready_chunks = Vec::new();
131
132 while let Some(chunk) = self.buffer.front() {
133 if chunk.sequence_number == self.next_expected_sequence {
134 let chunk = self.buffer.pop_front().unwrap();
135
136 self.sequence_numbers.remove(&chunk.sequence_number);
138
139 self.next_expected_sequence += 1;
140 ready_chunks.push(chunk);
141 } else {
142 break;
143 }
144 }
145
146 Ok(ready_chunks)
147 }
148
149 fn force_flush(&mut self) -> Vec<MediaChunk> {
150 let chunks: Vec<_> = self.buffer.drain(..).collect();
151
152 self.sequence_numbers.clear();
154
155 if let Some(last_chunk) = chunks.last() {
156 self.next_expected_sequence = last_chunk.sequence_number + 1;
157 }
158 chunks
159 }
160
161 fn is_timeout(&self, timeout: Duration) -> bool {
162 self.last_received_time.elapsed() > timeout
163 }
164}
165
166#[derive(Debug, Clone, Default)]
168pub struct ChunkProcessorStats {
169 pub chunks_processed: u64,
170 pub chunks_reordered: u64,
171 pub chunks_dropped: u64,
172 pub sequence_gaps: u64,
173 pub validation_failures: u64,
174 pub average_processing_time_ms: f64,
175 pub buffer_utilization: f64,
176}
177
178pub struct ChunkProcessor {
180 config: ChunkProcessorConfig,
181 codec: Arc<MediaCodec>,
182 reorder_buffers: Arc<RwLock<HashMap<String, ReorderBuffer>>>,
183 stats: Arc<Mutex<ChunkProcessorStats>>,
184 output_queue: Arc<Queue<MediaChunk>>,
185}
186
187impl ChunkProcessor {
188 pub fn new(
189 config: ChunkProcessorConfig,
190 codec: Arc<MediaCodec>,
191 output_queue: Arc<Queue<MediaChunk>>,
192 ) -> Self {
193 Self {
194 config,
195 codec,
196 reorder_buffers: Arc::new(RwLock::new(HashMap::new())),
197 stats: Arc::new(Mutex::new(ChunkProcessorStats::default())),
198 output_queue,
199 }
200 }
201
202 pub fn process_chunk_stream(
204 &self,
205 chunk_stream: RS2Stream<MediaChunk>,
206 ) -> RS2Stream<Result<MediaChunk, ChunkProcessingError>> {
207 let processor = self.clone();
208
209 let stream = chunk_stream.par_eval_map_rs2(self.config.parallel_processing, move |chunk| {
210 let processor = processor.clone();
211 async move { processor.process_single_chunk(chunk).await }
212 });
213
214 auto_backpressure_block(stream, self.config.max_buffer_size)
215 }
216
217
218 async fn process_single_chunk(
220 &self,
221 mut chunk: MediaChunk,
222 ) -> Result<MediaChunk, ChunkProcessingError> {
223 let start_time = Instant::now();
224
225 let original_seq_num = chunk.sequence_number;
228 let original_stream_id = chunk.stream_id.clone();
229
230 if self.config.enable_validation {
232 self.validate_chunk(&chunk).await
233 .map_err(|e| {
234 log::warn!("Chunk validation failed for stream {}: {:?}", chunk.stream_id, e);
235 e
236 })?;
237 }
238
239 if chunk.sequence_number == 0 {
241 chunk.sequence_number = self.generate_sequence_number(&chunk.stream_id).await;
242 }
243
244 let ready_chunks = if self.config.enable_reordering {
246 self.handle_reordering(chunk).await
247 .map_err(|e| {
248 log::warn!("Reordering failed for stream {}: {:?}", original_stream_id, e);
249 e
250 })?
251 } else {
252 vec![chunk]
253 };
254
255 for ready_chunk in ready_chunks {
257 if let Err(e) = self.output_queue.try_enqueue(ready_chunk).await {
259 let mut stats = self.stats.lock().await;
261 stats.chunks_dropped += 1;
262 log::debug!("Failed to enqueue chunk: {:?}", e);
263 }
264 }
265
266 {
268 let mut stats = self.stats.lock().await;
269 stats.chunks_processed += 1;
270 let processing_time = f64::max(0.1, start_time.elapsed().as_millis() as f64);
272 stats.average_processing_time_ms = (stats.average_processing_time_ms
273 * (stats.chunks_processed - 1) as f64
274 + processing_time)
275 / stats.chunks_processed as f64;
276 }
277
278 if rand::random::<f32>() < 0.01 {
282 log::debug!("Running periodic buffer cleanup");
283 self.cleanup_expired_buffers().await;
284 }
285
286 let stream_id_for_result = original_stream_id.clone(); Ok(MediaChunk {
291 stream_id: stream_id_for_result,
292 sequence_number: if original_seq_num == 0 {
293 self.generate_sequence_number(&original_stream_id).await
294 } else {
295 original_seq_num
296 },
297 data: Vec::new(),
299 chunk_type: ChunkType::Metadata,
300 priority: MediaPriority::Normal,
301 timestamp: Duration::from_secs(0),
302 is_final: false,
303 checksum: None,
304 })
305 }
306
307 async fn validate_chunk(&self, chunk: &MediaChunk) -> Result<(), ChunkProcessingError> {
309 if chunk.stream_id.is_empty() {
311 return Err(ChunkProcessingError::ValidationFailed(
312 "Empty stream ID".to_string(),
313 ));
314 }
315
316 if chunk.data.is_empty() {
317 return Err(ChunkProcessingError::ValidationFailed(
318 "Empty chunk data".to_string(),
319 ));
320 }
321
322 if let Some(expected_checksum) = &chunk.checksum {
324 let actual_checksum = self.calculate_checksum(&chunk.data);
325 if actual_checksum != *expected_checksum {
326 return Err(ChunkProcessingError::ValidationFailed(
327 "Checksum mismatch".to_string(),
328 ));
329 }
330 }
331
332 match chunk.chunk_type {
334 ChunkType::Audio => {
335 if chunk.data.len() > 64 * 1024 {
336 return Err(ChunkProcessingError::ValidationFailed(
337 "Audio chunk too large".to_string(),
338 ));
339 }
340 }
341 ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
342 if chunk.data.len() > 1024 * 1024 {
343 return Err(ChunkProcessingError::ValidationFailed(
344 "Video chunk too large".to_string(),
345 ));
346 }
347 }
348 _ => {} }
350
351 Ok(())
352 }
353
354 async fn handle_reordering(
356 &self,
357 chunk: MediaChunk,
358 ) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
359 let stream_id = chunk.stream_id.clone();
360
361 let result = {
363 let mut buffers = self.reorder_buffers.write().await;
364
365 if !buffers.contains_key(&stream_id) {
367 buffers.insert(
368 stream_id.clone(),
369 ReorderBuffer::new(self.config.max_reorder_window),
370 );
371 }
372
373 let buffer = buffers.get_mut(&stream_id).unwrap();
375 buffer.try_insert(chunk)
376 };
377
378 match result {
380 Ok(ready_chunks) => {
381 if ready_chunks.len() > 1 {
382 let mut stats = self.stats.lock().await;
383 stats.chunks_reordered += ready_chunks.len() as u64 - 1;
384 }
385 Ok(ready_chunks)
386 }
387 Err(e) => {
388 let mut stats = self.stats.lock().await;
389 match e {
390 ChunkProcessingError::SequenceGap { .. } => stats.sequence_gaps += 1,
391 ChunkProcessingError::DuplicateChunk(_) => stats.chunks_dropped += 1,
392 _ => stats.validation_failures += 1,
393 }
394 Err(e)
395 }
396 }
397 }
398
399 async fn generate_sequence_number(&self, stream_id: &str) -> u64 {
401 let buffers = self.reorder_buffers.read().await;
403 if let Some(buffer) = buffers.get(stream_id) {
404 buffer.next_expected_sequence
405 } else {
406 0
407 }
408 }
409
410 async fn cleanup_expired_buffers(&self) {
412 let mut buffers = self.reorder_buffers.write().await;
413 let mut to_remove = Vec::new();
414 let mut dropped_chunks = 0;
415
416 for (stream_id, buffer) in buffers.iter_mut() {
417 if buffer.is_timeout(self.config.sequence_timeout) {
418 let flushed_chunks = buffer.force_flush();
420
421 log::debug!(
422 "Flushing expired buffer for stream {}: {} chunks",
423 stream_id,
424 flushed_chunks.len()
425 );
426
427 for chunk in flushed_chunks {
429 if let Err(e) = self.output_queue.try_enqueue(chunk).await {
430 dropped_chunks += 1;
431 log::debug!(
432 "Failed to enqueue flushed chunk from stream {}: {:?}",
433 stream_id,
434 e
435 );
436 }
437 }
438
439 to_remove.push(stream_id.clone());
440 }
441 }
442
443 if dropped_chunks > 0 {
445 let mut stats = self.stats.lock().await;
446 stats.chunks_dropped += dropped_chunks;
447 log::warn!("Dropped {} chunks during buffer cleanup", dropped_chunks);
448 }
449
450 for stream_id in to_remove {
452 log::debug!("Removing expired buffer for stream {}", stream_id);
453 buffers.remove(&stream_id);
454 }
455 }
456
457 fn calculate_checksum(&self, data: &[u8]) -> String {
459 use sha2::{Digest, Sha256};
460 let mut hasher = Sha256::new();
461 hasher.update(data);
462 format!("{:x}", hasher.finalize())
463 }
464
465 pub async fn get_stats(&self) -> ChunkProcessorStats {
467 let stats = self.stats.lock().await;
468 let mut stats = stats.clone();
469
470 let buffers = self.reorder_buffers.read().await;
472 let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
473 let max_possible_size = buffers.len() * self.config.max_reorder_window;
474
475 stats.buffer_utilization = if max_possible_size > 0 {
476 total_buffer_size as f64 / max_possible_size as f64
477 } else {
478 0.0
479 };
480
481 stats
482 }
483
484 pub fn create_monitoring_stream(&self) -> RS2Stream<ChunkProcessorStats> {
486 let stats = Arc::clone(&self.stats);
487 let reorder_buffers = Arc::clone(&self.reorder_buffers);
488 let config = self.config.clone();
489
490 tick(Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
491 let stats = Arc::clone(&stats);
492 let reorder_buffers = Arc::clone(&reorder_buffers);
493 let config = config.clone();
494
495 async move {
496 let mut current_stats = {
497 let s = stats.lock().await;
498 s.clone()
499 };
500
501 let buffers = reorder_buffers.read().await;
503 let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
504 let max_possible_size = buffers.len() * config.max_reorder_window;
505
506 current_stats.buffer_utilization = if max_possible_size > 0 {
507 total_buffer_size as f64 / max_possible_size as f64
508 } else {
509 0.0
510 };
511
512 current_stats
513 }
514 })
515 }
516}
517
518impl Clone for ChunkProcessor {
519 fn clone(&self) -> Self {
520 Self {
521 config: self.config.clone(),
522 codec: Arc::clone(&self.codec),
523 reorder_buffers: Arc::clone(&self.reorder_buffers),
524 stats: Arc::clone(&self.stats),
525 output_queue: Arc::clone(&self.output_queue),
526 }
527 }
528}