1use super::codec::MediaCodec;
7use super::types::*;
8use crate::queue::Queue;
9use crate::*;
10use std::collections::{HashMap, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::{Mutex, RwLock};
14use tokio::time::Instant;
15
16#[derive(Debug, Clone)]
18pub enum ChunkProcessingError {
19 SequenceGap { expected: u64, received: u64 },
20 DuplicateChunk(u64),
21 BufferOverflow,
22 ValidationFailed(String),
23 CodecError(String),
24 Timeout,
25}
26
27impl std::fmt::Display for ChunkProcessingError {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 ChunkProcessingError::SequenceGap { expected, received } => {
31 write!(
32 f,
33 "Sequence gap: expected {}, received {}",
34 expected, received
35 )
36 }
37 ChunkProcessingError::DuplicateChunk(seq) => {
38 write!(f, "Duplicate chunk: {}", seq)
39 }
40 ChunkProcessingError::BufferOverflow => write!(f, "Buffer overflow"),
41 ChunkProcessingError::ValidationFailed(reason) => {
42 write!(f, "Validation failed: {}", reason)
43 }
44 ChunkProcessingError::CodecError(reason) => {
45 write!(f, "Codec error: {}", reason)
46 }
47 ChunkProcessingError::Timeout => write!(f, "Processing timeout"),
48 }
49 }
50}
51
52impl std::error::Error for ChunkProcessingError {}
53
54#[derive(Debug, Clone)]
56pub struct ChunkProcessorConfig {
57 pub max_buffer_size: usize,
58 pub sequence_timeout: Duration,
59 pub enable_reordering: bool,
60 pub max_reorder_window: usize,
61 pub enable_validation: bool,
62 pub parallel_processing: usize,
63}
64
65impl Default for ChunkProcessorConfig {
66 fn default() -> Self {
67 Self {
68 max_buffer_size: 1024,
69 sequence_timeout: Duration::from_secs(5),
70 enable_reordering: true,
71 max_reorder_window: 32,
72 enable_validation: true,
73 parallel_processing: 4,
74 }
75 }
76}
77
78#[derive(Debug)]
80struct ReorderBuffer {
81 buffer: VecDeque<MediaChunk>,
82 next_expected_sequence: u64,
83 last_received_time: Instant,
84 max_size: usize,
85 sequence_numbers: std::collections::HashSet<u64>,
87}
88
89impl ReorderBuffer {
90 fn new(max_size: usize) -> Self {
91 Self {
92 buffer: VecDeque::new(),
93 next_expected_sequence: 0,
94 last_received_time: Instant::now(),
95 max_size,
96 sequence_numbers: std::collections::HashSet::new(),
97 }
98 }
99
100 fn try_insert(&mut self, chunk: MediaChunk) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
101 if self.buffer.len() >= self.max_size {
102 return Err(ChunkProcessingError::BufferOverflow);
103 }
104
105 self.last_received_time = Instant::now();
106
107 let _seq_num = chunk.sequence_number;
109 if self.sequence_numbers.contains(&_seq_num) {
110 return Err(ChunkProcessingError::DuplicateChunk(_seq_num));
111 }
112
113 if _seq_num > self.next_expected_sequence && !self.buffer.is_empty() {
115 let max_allowed = self.next_expected_sequence + self.max_size as u64;
118 if _seq_num > max_allowed {
119 return Err(ChunkProcessingError::SequenceGap {
120 expected: self.next_expected_sequence,
121 received: _seq_num,
122 });
123 }
124 } else if _seq_num > self.next_expected_sequence + self.max_size as u64 {
125 return Err(ChunkProcessingError::SequenceGap {
127 expected: self.next_expected_sequence,
128 received: _seq_num,
129 });
130 }
131
132 let insert_pos = self
134 .buffer
135 .binary_search_by_key(&_seq_num, |c| c.sequence_number)
136 .unwrap_or_else(|pos| pos);
137
138 self.sequence_numbers.insert(_seq_num);
140
141 self.buffer.insert(insert_pos, chunk);
143
144 self.extract_ready_chunks()
146 }
147
148 fn extract_ready_chunks(&mut self) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
149 let mut ready_chunks = Vec::new();
150
151 while let Some(chunk) = self.buffer.front() {
152 if chunk.sequence_number == self.next_expected_sequence {
153 let _seq_num = chunk.sequence_number;
154 let chunk = self.buffer.pop_front().unwrap();
155
156 self.sequence_numbers.remove(&_seq_num);
158
159 self.next_expected_sequence += 1;
160 ready_chunks.push(chunk);
161 } else {
162 break;
163 }
164 }
165
166 Ok(ready_chunks)
167 }
168
169 fn force_flush(&mut self) -> Vec<MediaChunk> {
170 let chunks: Vec<_> = self.buffer.drain(..).collect();
171
172 self.sequence_numbers.clear();
174
175 if let Some(last_chunk) = chunks.last() {
176 self.next_expected_sequence = last_chunk.sequence_number + 1;
177 }
178 chunks
179 }
180
181 fn is_timeout(&self, timeout: Duration) -> bool {
182 self.last_received_time.elapsed() > timeout
183 }
184}
185
186#[derive(Debug, Clone, Default)]
188pub struct ChunkProcessorStats {
189 pub chunks_processed: u64,
190 pub chunks_reordered: u64,
191 pub chunks_dropped: u64,
192 pub sequence_gaps: u64,
193 pub validation_failures: u64,
194 pub average_processing_time_ms: f64,
195 pub buffer_utilization: f64,
196}
197
198pub struct ChunkProcessor {
200 config: ChunkProcessorConfig,
201 codec: Arc<MediaCodec>,
202 reorder_buffers: Arc<RwLock<HashMap<String, ReorderBuffer>>>,
203 stats: Arc<Mutex<ChunkProcessorStats>>,
204 output_queue: Arc<Queue<MediaChunk>>,
205}
206
207impl ChunkProcessor {
208 pub fn new(
209 config: ChunkProcessorConfig,
210 codec: Arc<MediaCodec>,
211 output_queue: Arc<Queue<MediaChunk>>,
212 ) -> Self {
213 Self {
214 config,
215 codec,
216 reorder_buffers: Arc::new(RwLock::new(HashMap::new())),
217 stats: Arc::new(Mutex::new(ChunkProcessorStats::default())),
218 output_queue,
219 }
220 }
221
222 pub fn process_chunk_stream(
224 &self,
225 chunk_stream: RS2Stream<MediaChunk>,
226 ) -> RS2Stream<Result<MediaChunk, ChunkProcessingError>> {
227 let processor = self.clone();
228
229 let stream = chunk_stream.par_eval_map_rs2(self.config.parallel_processing, move |chunk| {
230 let processor = processor.clone();
231 async move { processor.process_single_chunk(chunk).await }
232 });
233
234 auto_backpressure_block(stream, self.config.max_buffer_size)
235 }
236
237 async fn process_single_chunk(
239 &self,
240 mut chunk: MediaChunk,
241 ) -> Result<MediaChunk, ChunkProcessingError> {
242 let start_time = Instant::now();
243
244 let original_seq_num = chunk.sequence_number;
247 let original_stream_id = chunk.stream_id.clone();
248
249 if self.config.enable_validation {
251 self.validate_chunk(&chunk).await.map_err(|e| {
252 log::warn!(
253 "Chunk validation failed for stream {}: {:?}",
254 chunk.stream_id,
255 e
256 );
257 e
258 })?;
259 }
260
261 if chunk.sequence_number == 0 {
263 chunk.sequence_number = self.generate_sequence_number(&chunk.stream_id).await;
264 }
265
266 let ready_chunks = if self.config.enable_reordering {
268 self.handle_reordering(chunk).await.map_err(|e| {
269 log::warn!(
270 "Reordering failed for stream {}: {:?}",
271 original_stream_id,
272 e
273 );
274 e
275 })?
276 } else {
277 vec![chunk]
278 };
279
280 for ready_chunk in ready_chunks {
282 if let Err(e) = self.output_queue.try_enqueue(ready_chunk).await {
284 let mut stats = self.stats.lock().await;
286 stats.chunks_dropped += 1;
287 log::debug!("Failed to enqueue chunk: {:?}", e);
288 }
289 }
290
291 {
293 let mut stats = self.stats.lock().await;
294 stats.chunks_processed += 1;
295 let processing_time = f64::max(0.1, start_time.elapsed().as_millis() as f64);
297 stats.average_processing_time_ms = (stats.average_processing_time_ms
298 * (stats.chunks_processed - 1) as f64
299 + processing_time)
300 / stats.chunks_processed as f64;
301 }
302
303 if rand::random::<f32>() < 0.01 {
307 log::debug!("Running periodic buffer cleanup");
308 self.cleanup_expired_buffers().await;
309 }
310
311 let stream_id_for_result = original_stream_id.clone(); Ok(MediaChunk {
316 stream_id: stream_id_for_result,
317 sequence_number: if original_seq_num == 0 {
318 self.generate_sequence_number(&original_stream_id).await
319 } else {
320 original_seq_num
321 },
322 data: Vec::new(),
324 chunk_type: ChunkType::Metadata,
325 priority: MediaPriority::Normal,
326 timestamp: Duration::from_secs(0),
327 is_final: false,
328 checksum: None,
329 })
330 }
331
332 async fn validate_chunk(&self, chunk: &MediaChunk) -> Result<(), ChunkProcessingError> {
334 if chunk.stream_id.is_empty() {
336 return Err(ChunkProcessingError::ValidationFailed(
337 "Empty stream ID".to_string(),
338 ));
339 }
340
341 if chunk.data.is_empty() {
342 return Err(ChunkProcessingError::ValidationFailed(
343 "Empty chunk data".to_string(),
344 ));
345 }
346
347 if let Some(expected_checksum) = &chunk.checksum {
349 let actual_checksum = self.calculate_checksum(&chunk.data);
350 if actual_checksum != *expected_checksum {
351 return Err(ChunkProcessingError::ValidationFailed(
352 "Checksum mismatch".to_string(),
353 ));
354 }
355 }
356
357 match chunk.chunk_type {
359 ChunkType::Audio => {
360 if chunk.data.len() > 64 * 1024 {
361 return Err(ChunkProcessingError::ValidationFailed(
362 "Audio chunk too large".to_string(),
363 ));
364 }
365 }
366 ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
367 if chunk.data.len() > 1024 * 1024 {
368 return Err(ChunkProcessingError::ValidationFailed(
369 "Video chunk too large".to_string(),
370 ));
371 }
372 }
373 _ => {} }
375
376 Ok(())
377 }
378
379 async fn handle_reordering(
381 &self,
382 chunk: MediaChunk,
383 ) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
384 let stream_id = chunk.stream_id.clone();
385
386 let result = {
388 let mut buffers = self.reorder_buffers.write().await;
389
390 if !buffers.contains_key(&stream_id) {
392 buffers.insert(
393 stream_id.clone(),
394 ReorderBuffer::new(self.config.max_reorder_window),
395 );
396 }
397
398 let buffer = buffers.get_mut(&stream_id).unwrap();
400 buffer.try_insert(chunk)
401 };
402
403 match result {
405 Ok(ready_chunks) => {
406 if ready_chunks.len() > 1 {
407 let mut stats = self.stats.lock().await;
408 stats.chunks_reordered += ready_chunks.len() as u64 - 1;
409 }
410 Ok(ready_chunks)
411 }
412 Err(e) => {
413 let mut stats = self.stats.lock().await;
414 match e {
415 ChunkProcessingError::SequenceGap { .. } => stats.sequence_gaps += 1,
416 ChunkProcessingError::DuplicateChunk(_) => stats.chunks_dropped += 1,
417 _ => stats.validation_failures += 1,
418 }
419 Err(e)
420 }
421 }
422 }
423
424 async fn generate_sequence_number(&self, stream_id: &str) -> u64 {
426 let buffers = self.reorder_buffers.read().await;
428 if let Some(buffer) = buffers.get(stream_id) {
429 buffer.next_expected_sequence
430 } else {
431 0
432 }
433 }
434
435 async fn cleanup_expired_buffers(&self) {
437 let mut buffers = self.reorder_buffers.write().await;
438 let mut to_remove = Vec::new();
439 let mut dropped_chunks = 0;
440
441 for (stream_id, buffer) in buffers.iter_mut() {
442 if buffer.is_timeout(self.config.sequence_timeout) {
443 let flushed_chunks = buffer.force_flush();
445
446 log::debug!(
447 "Flushing expired buffer for stream {}: {} chunks",
448 stream_id,
449 flushed_chunks.len()
450 );
451
452 for chunk in flushed_chunks {
454 if let Err(e) = self.output_queue.try_enqueue(chunk).await {
455 dropped_chunks += 1;
456 log::debug!(
457 "Failed to enqueue flushed chunk from stream {}: {:?}",
458 stream_id,
459 e
460 );
461 }
462 }
463
464 to_remove.push(stream_id.clone());
465 }
466 }
467
468 if dropped_chunks > 0 {
470 let mut stats = self.stats.lock().await;
471 stats.chunks_dropped += dropped_chunks;
472 log::warn!("Dropped {} chunks during buffer cleanup", dropped_chunks);
473 }
474
475 for stream_id in to_remove {
477 log::debug!("Removing expired buffer for stream {}", stream_id);
478 buffers.remove(&stream_id);
479 }
480 }
481
482 fn calculate_checksum(&self, data: &[u8]) -> String {
484 use sha2::{Digest, Sha256};
485 let mut hasher = Sha256::new();
486 hasher.update(data);
487 format!("{:x}", hasher.finalize())
488 }
489
490 pub async fn get_stats(&self) -> ChunkProcessorStats {
492 let stats = self.stats.lock().await;
493 let mut stats = stats.clone();
494
495 let buffers = self.reorder_buffers.read().await;
497 let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
498 let max_possible_size = buffers.len() * self.config.max_reorder_window;
499
500 stats.buffer_utilization = if max_possible_size > 0 {
501 total_buffer_size as f64 / max_possible_size as f64
502 } else {
503 0.0
504 };
505
506 stats
507 }
508
509 pub fn create_monitoring_stream(&self) -> RS2Stream<ChunkProcessorStats> {
511 let stats = Arc::clone(&self.stats);
512 let reorder_buffers = Arc::clone(&self.reorder_buffers);
513 let config = self.config.clone();
514
515 tick(Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
516 let stats = Arc::clone(&stats);
517 let reorder_buffers = Arc::clone(&reorder_buffers);
518 let config = config.clone();
519
520 async move {
521 let mut current_stats = {
522 let s = stats.lock().await;
523 s.clone()
524 };
525
526 let buffers = reorder_buffers.read().await;
528 let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
529 let max_possible_size = buffers.len() * config.max_reorder_window;
530
531 current_stats.buffer_utilization = if max_possible_size > 0 {
532 total_buffer_size as f64 / max_possible_size as f64
533 } else {
534 0.0
535 };
536
537 current_stats
538 }
539 })
540 }
541}
542
543impl Clone for ChunkProcessor {
544 fn clone(&self) -> Self {
545 Self {
546 config: self.config.clone(),
547 codec: Arc::clone(&self.codec),
548 reorder_buffers: Arc::clone(&self.reorder_buffers),
549 stats: Arc::clone(&self.stats),
550 output_queue: Arc::clone(&self.output_queue),
551 }
552 }
553}