use super::codec::MediaCodec;
use super::types::*;
use crate::queue::Queue;
use crate::*;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tokio::time::Instant;
#[derive(Debug, Clone)]
pub enum ChunkProcessingError {
SequenceGap { expected: u64, received: u64 },
DuplicateChunk(u64),
BufferOverflow,
ValidationFailed(String),
CodecError(String),
Timeout,
}
impl std::fmt::Display for ChunkProcessingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChunkProcessingError::SequenceGap { expected, received } => {
write!(
f,
"Sequence gap: expected {}, received {}",
expected, received
)
}
ChunkProcessingError::DuplicateChunk(seq) => {
write!(f, "Duplicate chunk: {}", seq)
}
ChunkProcessingError::BufferOverflow => write!(f, "Buffer overflow"),
ChunkProcessingError::ValidationFailed(reason) => {
write!(f, "Validation failed: {}", reason)
}
ChunkProcessingError::CodecError(reason) => {
write!(f, "Codec error: {}", reason)
}
ChunkProcessingError::Timeout => write!(f, "Processing timeout"),
}
}
}
impl std::error::Error for ChunkProcessingError {}
#[derive(Debug, Clone)]
pub struct ChunkProcessorConfig {
pub max_buffer_size: usize,
pub sequence_timeout: Duration,
pub enable_reordering: bool,
pub max_reorder_window: usize,
pub enable_validation: bool,
pub parallel_processing: usize,
}
impl Default for ChunkProcessorConfig {
fn default() -> Self {
Self {
max_buffer_size: 1024,
sequence_timeout: Duration::from_secs(5),
enable_reordering: true,
max_reorder_window: 32,
enable_validation: true,
parallel_processing: 4,
}
}
}
#[derive(Debug)]
struct ReorderBuffer {
buffer: VecDeque<MediaChunk>,
next_expected_sequence: u64,
last_received_time: Instant,
max_size: usize,
sequence_numbers: std::collections::HashSet<u64>,
}
impl ReorderBuffer {
fn new(max_size: usize) -> Self {
Self {
buffer: VecDeque::new(),
next_expected_sequence: 0,
last_received_time: Instant::now(),
max_size,
sequence_numbers: std::collections::HashSet::new(),
}
}
fn try_insert(&mut self, chunk: MediaChunk) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
if self.buffer.len() >= self.max_size {
return Err(ChunkProcessingError::BufferOverflow);
}
self.last_received_time = Instant::now();
let _seq_num = chunk.sequence_number;
if self.sequence_numbers.contains(&_seq_num) {
return Err(ChunkProcessingError::DuplicateChunk(_seq_num));
}
if _seq_num > self.next_expected_sequence && !self.buffer.is_empty() {
let max_allowed = self.next_expected_sequence + self.max_size as u64;
if _seq_num > max_allowed {
return Err(ChunkProcessingError::SequenceGap {
expected: self.next_expected_sequence,
received: _seq_num,
});
}
} else if _seq_num > self.next_expected_sequence + self.max_size as u64 {
return Err(ChunkProcessingError::SequenceGap {
expected: self.next_expected_sequence,
received: _seq_num,
});
}
let insert_pos = self
.buffer
.binary_search_by_key(&_seq_num, |c| c.sequence_number)
.unwrap_or_else(|pos| pos);
self.sequence_numbers.insert(_seq_num);
self.buffer.insert(insert_pos, chunk);
self.extract_ready_chunks()
}
fn extract_ready_chunks(&mut self) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
let mut ready_chunks = Vec::new();
while let Some(chunk) = self.buffer.front() {
if chunk.sequence_number == self.next_expected_sequence {
let _seq_num = chunk.sequence_number;
let chunk = self.buffer.pop_front().unwrap();
self.sequence_numbers.remove(&_seq_num);
self.next_expected_sequence += 1;
ready_chunks.push(chunk);
} else {
break;
}
}
Ok(ready_chunks)
}
fn force_flush(&mut self) -> Vec<MediaChunk> {
let chunks: Vec<_> = self.buffer.drain(..).collect();
self.sequence_numbers.clear();
if let Some(last_chunk) = chunks.last() {
self.next_expected_sequence = last_chunk.sequence_number + 1;
}
chunks
}
fn is_timeout(&self, timeout: Duration) -> bool {
self.last_received_time.elapsed() > timeout
}
}
#[derive(Debug, Clone, Default)]
pub struct ChunkProcessorStats {
pub chunks_processed: u64,
pub chunks_reordered: u64,
pub chunks_dropped: u64,
pub sequence_gaps: u64,
pub validation_failures: u64,
pub average_processing_time_ms: f64,
pub buffer_utilization: f64,
}
pub struct ChunkProcessor {
config: ChunkProcessorConfig,
codec: Arc<MediaCodec>,
reorder_buffers: Arc<RwLock<HashMap<String, ReorderBuffer>>>,
stats: Arc<Mutex<ChunkProcessorStats>>,
output_queue: Arc<Queue<MediaChunk>>,
}
impl ChunkProcessor {
pub fn new(
config: ChunkProcessorConfig,
codec: Arc<MediaCodec>,
output_queue: Arc<Queue<MediaChunk>>,
) -> Self {
Self {
config,
codec,
reorder_buffers: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(Mutex::new(ChunkProcessorStats::default())),
output_queue,
}
}
pub fn process_chunk_stream(
&self,
chunk_stream: RS2Stream<MediaChunk>,
) -> RS2Stream<Result<MediaChunk, ChunkProcessingError>> {
let processor = self.clone();
let stream = chunk_stream.par_eval_map_rs2(self.config.parallel_processing, move |chunk| {
let processor = processor.clone();
async move { processor.process_single_chunk(chunk).await }
});
auto_backpressure_block(stream, self.config.max_buffer_size)
}
async fn process_single_chunk(
&self,
mut chunk: MediaChunk,
) -> Result<MediaChunk, ChunkProcessingError> {
let start_time = Instant::now();
let original_seq_num = chunk.sequence_number;
let original_stream_id = chunk.stream_id.clone();
if self.config.enable_validation {
self.validate_chunk(&chunk).await.map_err(|e| {
log::warn!(
"Chunk validation failed for stream {}: {:?}",
chunk.stream_id,
e
);
e
})?;
}
if chunk.sequence_number == 0 {
chunk.sequence_number = self.generate_sequence_number(&chunk.stream_id).await;
}
let ready_chunks = if self.config.enable_reordering {
self.handle_reordering(chunk).await.map_err(|e| {
log::warn!(
"Reordering failed for stream {}: {:?}",
original_stream_id,
e
);
e
})?
} else {
vec![chunk]
};
for ready_chunk in ready_chunks {
if let Err(e) = self.output_queue.try_enqueue(ready_chunk).await {
let mut stats = self.stats.lock().await;
stats.chunks_dropped += 1;
log::debug!("Failed to enqueue chunk: {:?}", e);
}
}
{
let mut stats = self.stats.lock().await;
stats.chunks_processed += 1;
let processing_time = f64::max(0.1, start_time.elapsed().as_millis() as f64);
stats.average_processing_time_ms = (stats.average_processing_time_ms
* (stats.chunks_processed - 1) as f64
+ processing_time)
/ stats.chunks_processed as f64;
}
if rand::random::<f32>() < 0.01 {
log::debug!("Running periodic buffer cleanup");
self.cleanup_expired_buffers().await;
}
let stream_id_for_result = original_stream_id.clone();
Ok(MediaChunk {
stream_id: stream_id_for_result,
sequence_number: if original_seq_num == 0 {
self.generate_sequence_number(&original_stream_id).await
} else {
original_seq_num
},
data: Vec::new(),
chunk_type: ChunkType::Metadata,
priority: MediaPriority::Normal,
timestamp: Duration::from_secs(0),
is_final: false,
checksum: None,
})
}
async fn validate_chunk(&self, chunk: &MediaChunk) -> Result<(), ChunkProcessingError> {
if chunk.stream_id.is_empty() {
return Err(ChunkProcessingError::ValidationFailed(
"Empty stream ID".to_string(),
));
}
if chunk.data.is_empty() {
return Err(ChunkProcessingError::ValidationFailed(
"Empty chunk data".to_string(),
));
}
if let Some(expected_checksum) = &chunk.checksum {
let actual_checksum = self.calculate_checksum(&chunk.data);
if actual_checksum != *expected_checksum {
return Err(ChunkProcessingError::ValidationFailed(
"Checksum mismatch".to_string(),
));
}
}
match chunk.chunk_type {
ChunkType::Audio => {
if chunk.data.len() > 64 * 1024 {
return Err(ChunkProcessingError::ValidationFailed(
"Audio chunk too large".to_string(),
));
}
}
ChunkType::VideoIFrame | ChunkType::VideoPFrame | ChunkType::VideoBFrame => {
if chunk.data.len() > 1024 * 1024 {
return Err(ChunkProcessingError::ValidationFailed(
"Video chunk too large".to_string(),
));
}
}
_ => {} }
Ok(())
}
async fn handle_reordering(
&self,
chunk: MediaChunk,
) -> Result<Vec<MediaChunk>, ChunkProcessingError> {
let stream_id = chunk.stream_id.clone();
let result = {
let mut buffers = self.reorder_buffers.write().await;
if !buffers.contains_key(&stream_id) {
buffers.insert(
stream_id.clone(),
ReorderBuffer::new(self.config.max_reorder_window),
);
}
let buffer = buffers.get_mut(&stream_id).unwrap();
buffer.try_insert(chunk)
};
match result {
Ok(ready_chunks) => {
if ready_chunks.len() > 1 {
let mut stats = self.stats.lock().await;
stats.chunks_reordered += ready_chunks.len() as u64 - 1;
}
Ok(ready_chunks)
}
Err(e) => {
let mut stats = self.stats.lock().await;
match e {
ChunkProcessingError::SequenceGap { .. } => stats.sequence_gaps += 1,
ChunkProcessingError::DuplicateChunk(_) => stats.chunks_dropped += 1,
_ => stats.validation_failures += 1,
}
Err(e)
}
}
}
async fn generate_sequence_number(&self, stream_id: &str) -> u64 {
let buffers = self.reorder_buffers.read().await;
if let Some(buffer) = buffers.get(stream_id) {
buffer.next_expected_sequence
} else {
0
}
}
async fn cleanup_expired_buffers(&self) {
let mut buffers = self.reorder_buffers.write().await;
let mut to_remove = Vec::new();
let mut dropped_chunks = 0;
for (stream_id, buffer) in buffers.iter_mut() {
if buffer.is_timeout(self.config.sequence_timeout) {
let flushed_chunks = buffer.force_flush();
log::debug!(
"Flushing expired buffer for stream {}: {} chunks",
stream_id,
flushed_chunks.len()
);
for chunk in flushed_chunks {
if let Err(e) = self.output_queue.try_enqueue(chunk).await {
dropped_chunks += 1;
log::debug!(
"Failed to enqueue flushed chunk from stream {}: {:?}",
stream_id,
e
);
}
}
to_remove.push(stream_id.clone());
}
}
if dropped_chunks > 0 {
let mut stats = self.stats.lock().await;
stats.chunks_dropped += dropped_chunks;
log::warn!("Dropped {} chunks during buffer cleanup", dropped_chunks);
}
for stream_id in to_remove {
log::debug!("Removing expired buffer for stream {}", stream_id);
buffers.remove(&stream_id);
}
}
fn calculate_checksum(&self, data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
pub async fn get_stats(&self) -> ChunkProcessorStats {
let stats = self.stats.lock().await;
let mut stats = stats.clone();
let buffers = self.reorder_buffers.read().await;
let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
let max_possible_size = buffers.len() * self.config.max_reorder_window;
stats.buffer_utilization = if max_possible_size > 0 {
total_buffer_size as f64 / max_possible_size as f64
} else {
0.0
};
stats
}
pub fn create_monitoring_stream(&self) -> RS2Stream<ChunkProcessorStats> {
let stats = Arc::clone(&self.stats);
let reorder_buffers = Arc::clone(&self.reorder_buffers);
let config = self.config.clone();
tick(Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
let stats = Arc::clone(&stats);
let reorder_buffers = Arc::clone(&reorder_buffers);
let config = config.clone();
async move {
let mut current_stats = {
let s = stats.lock().await;
s.clone()
};
let buffers = reorder_buffers.read().await;
let total_buffer_size: usize = buffers.values().map(|b| b.buffer.len()).sum();
let max_possible_size = buffers.len() * config.max_reorder_window;
current_stats.buffer_utilization = if max_possible_size > 0 {
total_buffer_size as f64 / max_possible_size as f64
} else {
0.0
};
current_stats
}
})
}
}
impl Clone for ChunkProcessor {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
codec: Arc::clone(&self.codec),
reorder_buffers: Arc::clone(&self.reorder_buffers),
stats: Arc::clone(&self.stats),
output_queue: Arc::clone(&self.output_queue),
}
}
}