use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
#[cfg(test)]
mod tests {
use super::*;
fn chunk_message(data: &[u8], chunk_size: usize) -> Vec<Vec<u8>> {
data.chunks(chunk_size)
.map(|chunk| chunk.to_vec())
.collect()
}
fn reassemble_chunks(chunks: Vec<Vec<u8>>) -> Vec<u8> {
chunks.into_iter().flatten().collect()
}
#[test]
fn test_message_chunking_correctness() {
let test_sizes = vec![100, 1024, 10_000, 100_000, 1_000_000];
let chunk_size = 8192;
for size in test_sizes {
let original: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
let chunks = chunk_message(&original, chunk_size);
let reassembled = reassemble_chunks(chunks.clone());
assert_eq!(original, reassembled, "Failed for size {}", size);
let expected_chunks = (size + chunk_size - 1) / chunk_size;
assert_eq!(
chunks.len(),
expected_chunks,
"Wrong chunk count for size {}",
size
);
println!("✓ Size {} -> {} chunks", size, chunks.len());
}
}
#[test]
fn test_chunking_edge_cases() {
let chunk_size = 1024;
let empty = vec![];
let chunks = chunk_message(&empty, chunk_size);
assert_eq!(chunks.len(), 0);
let one_chunk = vec![42u8; chunk_size];
let chunks = chunk_message(&one_chunk, chunk_size);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].len(), chunk_size);
let over_one = vec![42u8; chunk_size + 1];
let chunks = chunk_message(&over_one, chunk_size);
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].len(), chunk_size);
assert_eq!(chunks[1].len(), 1);
}
#[tokio::test]
async fn test_validation_cache_behavior() {
struct Cache {
data: Arc<RwLock<HashMap<Vec<u8>, bool>>>,
hits: Arc<RwLock<usize>>,
misses: Arc<RwLock<usize>>,
}
impl Cache {
fn new() -> Self {
Self {
data: Arc::new(RwLock::new(HashMap::new())),
hits: Arc::new(RwLock::new(0)),
misses: Arc::new(RwLock::new(0)),
}
}
async fn validate(&self, key: Vec<u8>) -> bool {
let cache = self.data.read().await;
if let Some(&result) = cache.get(&key) {
drop(cache);
*self.hits.write().await += 1;
return result;
}
drop(cache);
*self.misses.write().await += 1;
tokio::time::sleep(Duration::from_millis(10)).await;
let result = key.len() > 0 && key[0] % 2 == 0;
self.data.write().await.insert(key, result);
result
}
async fn stats(&self) -> (usize, usize) {
(*self.hits.read().await, *self.misses.read().await)
}
}
let cache = Cache::new();
let key1 = vec![2, 4, 6];
let key2 = vec![1, 3, 5];
let start = Instant::now();
assert!(cache.validate(key1.clone()).await);
let miss_time = start.elapsed();
let start = Instant::now();
assert!(cache.validate(key1.clone()).await);
let hit_time = start.elapsed();
assert!(hit_time < miss_time / 5, "Cache hit should be much faster");
let (hits, misses) = cache.stats().await;
assert_eq!(hits, 1);
assert_eq!(misses, 1);
assert!(!cache.validate(key2.clone()).await);
let (hits, misses) = cache.stats().await;
assert_eq!(hits, 1);
assert_eq!(misses, 2);
println!("Cache performance:");
println!(" Miss time: {:?}", miss_time);
println!(" Hit time: {:?}", hit_time);
println!(
" Speedup: {:.2}x",
miss_time.as_secs_f64() / hit_time.as_secs_f64()
);
}
#[tokio::test]
async fn test_concurrent_validation() {
use tokio::sync::Semaphore;
let semaphore = Arc::new(Semaphore::new(10)); let mut handles = vec![];
let start = Instant::now();
for i in 0..100 {
let sem = semaphore.clone();
let handle = tokio::spawn(async move {
let _permit = sem.acquire().await.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
i % 2 == 0
});
handles.push(handle);
}
let mut results = vec![];
for handle in handles {
results.push(handle.await.unwrap());
}
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_millis(100));
assert_eq!(results.len(), 100);
println!("Concurrent validation:");
println!(" 100 tasks completed in {:?}", elapsed);
println!(
" Throughput: {:.0} validations/sec",
100.0 / elapsed.as_secs_f64()
);
}
#[test]
fn test_memory_overhead() {
use std::mem;
let original_size = 1_000_000; let chunk_size = 8192; let data = vec![0u8; original_size];
let chunks = chunk_message(&data, chunk_size);
let chunk_overhead = chunks.len() * mem::size_of::<Vec<u8>>();
println!("Memory overhead for chunking:");
println!(" Original size: {} bytes", original_size);
println!(" Number of chunks: {}", chunks.len());
println!(" Overhead per chunk: {} bytes", mem::size_of::<Vec<u8>>());
println!(
" Total overhead: {} bytes ({:.2}%)",
chunk_overhead,
(chunk_overhead as f64 / original_size as f64) * 100.0
);
assert!(chunk_overhead < original_size / 100); }
#[tokio::test]
async fn test_backpressure_handling() {
use tokio::sync::mpsc;
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(10);
let producer = tokio::spawn(async move {
for i in 0..100 {
let chunk = vec![i as u8; 1024];
if tx.send(chunk).await.is_err() {
break;
}
}
});
let consumer = tokio::spawn(async move {
let mut count = 0;
while let Some(_chunk) = rx.recv().await {
tokio::time::sleep(Duration::from_millis(10)).await;
count += 1;
if count >= 20 {
break; }
}
count
});
let _ = producer.await;
let processed = consumer.await.unwrap();
assert_eq!(processed, 20);
println!("Backpressure test: processed {} messages", processed);
}
}