use futures_util::StreamExt;
use rs2_stream::media::chunk_processor::{
ChunkProcessingError, ChunkProcessor, ChunkProcessorConfig,
};
use rs2_stream::media::codec::{EncodingConfig, MediaCodec};
use rs2_stream::media::types::{ChunkType, MediaChunk, MediaPriority};
use rs2_stream::queue::Queue;
use rs2_stream::rs2::*;
use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Runtime;
#[test]
fn test_chunk_processor_basic() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let codec = Arc::new(MediaCodec::new(EncodingConfig::default()));
let output_queue = Arc::new(Queue::<MediaChunk>::bounded(10));
let mut config = ChunkProcessorConfig::default();
config.enable_reordering = false; config.enable_validation = false; config.parallel_processing = 1; config.max_buffer_size = 10;
let processor = ChunkProcessor::new(config, codec, output_queue.clone());
let chunk = create_test_chunk("test_stream", 1, ChunkType::VideoIFrame);
let chunk_stream = from_iter(vec![chunk.clone()]);
let mut result_stream = processor.process_chunk_stream(chunk_stream);
let result = result_stream.next().await.unwrap();
assert!(result.is_ok());
let mut dequeue_stream = output_queue.dequeue();
let output_chunk = dequeue_stream.next().await.unwrap();
assert_eq!(output_chunk.stream_id, "test_stream");
assert_eq!(output_chunk.sequence_number, 1);
});
}
#[test]
fn test_chunk_processor_validation() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let codec = Arc::new(MediaCodec::new(EncodingConfig::default()));
let output_queue = Arc::new(Queue::<MediaChunk>::bounded(10));
let mut config = ChunkProcessorConfig::default();
config.enable_validation = true;
let processor = ChunkProcessor::new(config, codec, output_queue.clone());
let invalid_chunk = create_test_chunk("", 1, ChunkType::VideoIFrame);
let invalid_chunk_stream = from_iter(vec![invalid_chunk]);
let mut result_stream = processor.process_chunk_stream(invalid_chunk_stream);
let result = result_stream.next().await.unwrap();
assert!(result.is_err());
match result {
Err(ChunkProcessingError::ValidationFailed(_)) => {
}
_ => panic!("Expected ValidationFailed error"),
}
let valid_chunk = create_test_chunk("test_stream", 2, ChunkType::VideoIFrame);
let valid_chunk_stream = from_iter(vec![valid_chunk]);
let mut result_stream = processor.process_chunk_stream(valid_chunk_stream);
let result = result_stream.next().await.unwrap();
assert!(result.is_ok());
});
}
#[test]
fn test_chunk_processor_stats() {
let rt = Runtime::new().unwrap();
rt.block_on(async {
let codec = Arc::new(MediaCodec::new(EncodingConfig::default()));
let output_queue = Arc::new(Queue::<MediaChunk>::bounded(10));
let mut config = ChunkProcessorConfig::default();
config.enable_reordering = false; config.parallel_processing = 1; config.max_buffer_size = 10;
let processor = ChunkProcessor::new(config, codec, output_queue.clone());
for i in 1..=5 {
let chunk = create_test_chunk("test_stream", i, ChunkType::VideoIFrame);
let chunk_stream = from_iter(vec![chunk.clone()]);
let mut result_stream = processor.process_chunk_stream(chunk_stream);
let result = result_stream.next().await.unwrap();
assert!(result.is_ok());
let mut dequeue_stream = output_queue.dequeue();
let output_chunk = dequeue_stream.next().await.unwrap();
assert_eq!(output_chunk.sequence_number, i);
}
tokio::time::sleep(Duration::from_millis(10)).await;
let stats = processor.get_stats().await;
assert_eq!(
stats.chunks_processed, 5,
"Expected 5 chunks processed, got {}",
stats.chunks_processed
);
assert_eq!(
stats.chunks_dropped, 0,
"Expected 0 chunks dropped, got {}",
stats.chunks_dropped
);
assert!(
stats.average_processing_time_ms > 0.0,
"Expected positive processing time, got {}",
stats.average_processing_time_ms
);
});
}
fn create_test_chunk(stream_id: &str, sequence: u64, chunk_type: ChunkType) -> MediaChunk {
MediaChunk {
stream_id: stream_id.to_string(),
sequence_number: sequence,
data: vec![0u8; 1024], chunk_type,
priority: MediaPriority::Normal,
timestamp: Duration::from_millis(sequence * 33),
is_final: false,
checksum: None,
}
}