use shardex::{
AsyncDocumentTextStorage, AsyncStorageConfig, ConcurrentDocumentTextStorage, ConcurrentStorageConfig, DocumentId,
DocumentTextStorage, MemoryPoolConfig, MonitoringPerformanceMonitor, ShardexError, TextMemoryPool,
};
use std::time::{Duration, Instant};
use tempfile::TempDir;
use tokio::time::timeout;
mod integration_utils {
use super::*;
pub fn create_test_storage() -> (TempDir, DocumentTextStorage) {
let temp_dir = TempDir::new().unwrap();
let storage = DocumentTextStorage::create(&temp_dir, 10 * 1024 * 1024).unwrap();
(temp_dir, storage)
}
pub fn create_test_storage_mut() -> (TempDir, DocumentTextStorage) {
create_test_storage()
}
pub fn generate_test_documents(count: usize, size: usize) -> Vec<(DocumentId, String)> {
(0..count)
.map(|i| {
let text = format!("Test document {} content: {}", i, "Lorem ipsum ".repeat(size / 12));
(DocumentId::new(), text)
})
.collect()
}
#[allow(dead_code)]
pub async fn populate_storage<S>(
storage: &S,
documents: &[(DocumentId, String)],
store_fn: impl Fn(&S, DocumentId, &str) -> Result<(), ShardexError> + Send + Sync,
) -> Result<(), ShardexError> {
for (doc_id, text) in documents {
store_fn(storage, *doc_id, text)?;
}
Ok(())
}
}
#[tokio::test]
async fn test_optimized_memory_mapping_integration() {
let (_temp_dir, mut base_storage) = integration_utils::create_test_storage_mut();
let documents = integration_utils::generate_test_documents(100, 1024);
for (doc_id, text) in &documents {
base_storage.store_text(*doc_id, text).unwrap();
}
let start_time = Instant::now();
for _ in 0..5 {
for (doc_id, _) in &documents[90..] {
let retrieved = base_storage.get_text(*doc_id).unwrap();
assert!(!retrieved.is_empty());
}
}
let base_duration = start_time.elapsed();
println!("Base storage repeated access took: {:?}", base_duration);
assert!(
base_duration < Duration::from_secs(5),
"Base operations should complete in reasonable time"
);
}
#[tokio::test]
async fn test_concurrent_storage_integration() {
let (_temp_dir, base_storage) = integration_utils::create_test_storage();
let config = ConcurrentStorageConfig::default();
let concurrent_storage = ConcurrentDocumentTextStorage::new(base_storage, config);
concurrent_storage
.start_background_processor()
.await
.unwrap();
let documents = integration_utils::generate_test_documents(50, 2048);
let storage_arc = std::sync::Arc::new(concurrent_storage);
let mut write_tasks = Vec::new();
for (doc_id, text) in &documents[0..25] {
let storage: std::sync::Arc<ConcurrentDocumentTextStorage> = std::sync::Arc::clone(&storage_arc);
let doc_id = *doc_id;
let text = text.clone();
write_tasks.push(tokio::spawn(async move {
storage.store_text_immediate(doc_id, &text).await.unwrap();
}));
}
for (doc_id, text) in &documents[25..35] {
storage_arc
.store_text_immediate(*doc_id, text)
.await
.unwrap();
}
let mut read_tasks = Vec::new();
for (doc_id, expected_text) in &documents[25..35] {
let storage: std::sync::Arc<ConcurrentDocumentTextStorage> = std::sync::Arc::clone(&storage_arc);
let doc_id = *doc_id;
let expected_text = expected_text.clone();
read_tasks.push(tokio::spawn(async move {
let retrieved = storage.get_text_concurrent(doc_id).await.unwrap();
assert_eq!(retrieved, expected_text);
}));
}
for task in write_tasks {
task.await.unwrap();
}
for task in read_tasks {
task.await.unwrap();
}
let batch_docs: Vec<_> = documents[35..45]
.iter()
.map(|(id, text)| (*id, text.clone()))
.collect();
let mut batch_tasks = Vec::new();
for (doc_id, text) in batch_docs {
let storage: std::sync::Arc<ConcurrentDocumentTextStorage> = std::sync::Arc::clone(&storage_arc);
batch_tasks.push(tokio::spawn(async move {
storage.store_text_batched(doc_id, text).await.unwrap();
}));
}
for task in batch_tasks {
task.await.unwrap();
}
for (doc_id, expected_text) in &documents[0..45] {
let retrieved = storage_arc.get_text_concurrent(*doc_id).await.unwrap();
assert_eq!(retrieved, *expected_text);
}
let metrics = storage_arc.get_metrics();
assert!(metrics.read_operations > 0);
assert!(metrics.write_operations > 0);
assert!(metrics.successful_reads > 0);
assert!(metrics.successful_writes > 0);
storage_arc.flush_write_queue().await.unwrap();
storage_arc.stop_background_processor().await.unwrap();
}
#[tokio::test]
async fn test_text_memory_pool_integration() {
let config = MemoryPoolConfig {
max_pool_size: 100,
max_buffer_capacity: 64 * 1024, buffer_ttl: Duration::from_secs(60),
growth_factor: 1.5,
};
let pool = TextMemoryPool::new(config);
pool.prewarm(20, 4096, 10, 8192);
let mut string_buffers = Vec::new();
let mut byte_buffers = Vec::new();
for i in 0..50 {
let mut string_buf = pool.get_string_buffer(1024 + (i * 100));
string_buf
.buffer_mut()
.push_str(&format!("Test string buffer {}", i));
string_buffers.push(string_buf);
let mut byte_buf = pool.get_byte_buffer(2048 + (i * 50));
byte_buf
.buffer_mut()
.extend_from_slice(format!("Test byte buffer {}", i).as_bytes());
byte_buffers.push(byte_buf);
}
drop(string_buffers);
drop(byte_buffers);
let stats_after_first_round = pool.get_stats();
println!(
"After first round - String pool: {}, Byte pool: {}",
stats_after_first_round.string_pool_size, stats_after_first_round.byte_pool_size
);
let mut second_string_buffers = Vec::new();
for i in 0..30 {
let mut string_buf = pool.get_string_buffer(1024);
assert!(string_buf.is_empty(), "Pooled buffer should be cleared");
string_buf
.buffer_mut()
.push_str(&format!("Reused string buffer {}", i));
second_string_buffers.push(string_buf);
}
let final_stats = pool.get_stats();
println!(
"Final stats - Hit ratio: {:.2}, Total requests: {}",
final_stats.hit_ratio(),
final_stats.total_requests
);
assert!(final_stats.hit_ratio() > 0.0, "Should have some cache hits");
assert!(final_stats.total_requests > 100, "Should have processed many requests");
pool.cleanup();
let oversized_buffer = pool.get_string_buffer(100 * 1024); assert!(oversized_buffer.capacity() >= 100 * 1024);
drop(oversized_buffer);
let efficiency_stats = pool.get_stats();
assert!(efficiency_stats.memory_efficiency() >= 0.0);
}
#[tokio::test]
async fn test_async_storage_integration() {
let (_temp_dir, base_storage) = integration_utils::create_test_storage();
let config = AsyncStorageConfig::default();
let async_storage = AsyncDocumentTextStorage::new(base_storage, config)
.await
.unwrap();
let documents = integration_utils::generate_test_documents(30, 1024);
for (doc_id, text) in &documents[0..10] {
async_storage
.store_text_async(*doc_id, text.clone())
.await
.unwrap();
}
for (doc_id, expected_text) in &documents[0..10] {
let retrieved = async_storage.get_text_async(*doc_id).await.unwrap();
assert_eq!(retrieved, *expected_text);
}
let doc_id = documents[0].0;
let start_time = Instant::now();
let _ = async_storage.get_text_async(doc_id).await.unwrap();
let first_read_time = start_time.elapsed();
let start_time = Instant::now();
let _ = async_storage.get_text_async(doc_id).await.unwrap();
let second_read_time = start_time.elapsed();
println!("First read: {:?}, Second read: {:?}", first_read_time, second_read_time);
let batch_documents: Vec<_> = documents[10..20]
.iter()
.map(|(id, text)| (*id, text.clone()))
.collect();
let batch_results = async_storage
.store_texts_batch_async(batch_documents.clone())
.await
.unwrap();
for result in batch_results {
assert!(result.is_ok());
}
for (doc_id, expected_text) in &batch_documents {
let retrieved = async_storage.get_text_async(*doc_id).await.unwrap();
assert_eq!(retrieved, *expected_text);
}
let large_doc_id = documents[20].0;
let large_text = &documents[20].1;
async_storage
.store_text_async(large_doc_id, large_text.clone())
.await
.unwrap();
let substring = async_storage
.extract_text_substring_async(large_doc_id, 0, 100)
.await
.unwrap();
assert_eq!(substring, large_text[0..100]);
let timeout_result = timeout(
Duration::from_millis(1), async_storage.get_text_async(documents[0].0),
)
.await;
println!("Timeout test result: {:?}", timeout_result.is_err());
let (buffer_size, buffer_capacity) = async_storage.read_ahead_info().await;
println!("Read-ahead buffer: {}/{}", buffer_size, buffer_capacity);
assert!(buffer_capacity > 0);
let warm_doc_ids: Vec<_> = documents[25..30].iter().map(|(id, _)| *id).collect();
for (doc_id, text) in &documents[25..30] {
async_storage
.store_text_async(*doc_id, text.clone())
.await
.unwrap();
}
async_storage
.warm_read_ahead_buffer(warm_doc_ids.clone())
.await
.unwrap();
let metrics = async_storage.get_metrics();
assert!(metrics.async_reads > 0);
assert!(metrics.async_writes > 0);
assert!(metrics.total_async_operations() > 0);
async_storage.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_full_integration_stack() {
let (_temp_dir, base_storage) = integration_utils::create_test_storage();
let memory_pool = TextMemoryPool::new_default();
memory_pool.prewarm(50, 2048, 25, 4096);
let concurrent_config = ConcurrentStorageConfig {
max_batch_size: 20,
write_timeout: Duration::from_secs(10),
metadata_cache_size: 1000,
batch_interval: Duration::from_millis(50),
max_concurrent_ops: 50,
};
let concurrent_storage = ConcurrentDocumentTextStorage::new(base_storage, concurrent_config);
concurrent_storage
.start_background_processor()
.await
.unwrap();
let concurrent_storage_arc = std::sync::Arc::new(concurrent_storage);
let _async_config = AsyncStorageConfig {
concurrent_config: ConcurrentStorageConfig::default(),
read_ahead_buffer_size: 500,
read_ahead_ttl: Duration::from_secs(300),
max_concurrent_async_ops: 100,
default_timeout: Duration::from_secs(30),
read_ahead_window: 10,
cleanup_interval: Duration::from_secs(60),
max_access_history: 1000,
prediction_temporal_window: Duration::from_secs(1800),
max_cooccurrence_patterns: 50,
prediction_count: 5,
};
let documents = integration_utils::generate_test_documents(100, 2048);
let start_time = Instant::now();
let mut batch_tasks = Vec::new();
let batch_size = 10;
for batch_start in (0..documents.len()).step_by(batch_size) {
let batch_end = (batch_start + batch_size).min(documents.len());
let batch = &documents[batch_start..batch_end];
let storage: std::sync::Arc<ConcurrentDocumentTextStorage> = std::sync::Arc::clone(&concurrent_storage_arc);
let batch = batch.to_vec();
batch_tasks.push(tokio::spawn(async move {
for (doc_id, text) in batch {
let pool = TextMemoryPool::new_default();
let mut buffer = pool.get_string_buffer(text.len());
buffer.buffer_mut().push_str(&text);
let processed_text = buffer.buffer().clone();
storage
.store_text_batched(doc_id, processed_text)
.await
.unwrap();
}
}));
}
for task in batch_tasks {
task.await.unwrap();
}
let storage_duration = start_time.elapsed();
println!(
"Full stack storage of {} documents took: {:?}",
documents.len(),
storage_duration
);
let start_time = Instant::now();
let mut read_tasks = Vec::new();
for (doc_id, expected_text) in &documents {
let storage: std::sync::Arc<ConcurrentDocumentTextStorage> = std::sync::Arc::clone(&concurrent_storage_arc);
let doc_id = *doc_id;
let expected_text = expected_text.clone();
read_tasks.push(tokio::spawn(async move {
let retrieved = storage.get_text_concurrent(doc_id).await.unwrap();
assert_eq!(retrieved, expected_text);
retrieved.len() }));
}
let mut total_bytes = 0;
for task in read_tasks {
total_bytes += task.await.unwrap();
}
let retrieval_duration = start_time.elapsed();
println!(
"Full stack retrieval of {} documents ({} bytes) took: {:?}",
documents.len(),
total_bytes,
retrieval_duration
);
assert!(
storage_duration < Duration::from_secs(30),
"Storage should be reasonably fast"
);
assert!(
retrieval_duration < Duration::from_secs(30),
"Retrieval should be reasonably fast"
);
let concurrent_metrics = concurrent_storage_arc.get_metrics();
let memory_pool_stats = memory_pool.get_stats();
println!("Integration test results:");
println!(" Concurrent operations: {}", concurrent_metrics.total_operations());
println!(
" Concurrent success rate: {:.2}%",
concurrent_metrics.read_success_ratio() * 100.0
);
println!(" Memory pool hit rate: {:.2}%", memory_pool_stats.hit_ratio() * 100.0);
println!(
" Cache hit ratio: {:.2}%",
concurrent_metrics.metadata_cache_hit_ratio() * 100.0
);
assert!(
concurrent_metrics.read_success_ratio() >= 0.95,
"Should have >95% read success rate"
);
assert!(
concurrent_metrics.write_success_ratio() >= 0.95,
"Should have >95% write success rate"
);
assert!(
concurrent_metrics.total_operations() > 0,
"Should have processed operations"
);
concurrent_storage_arc.flush_write_queue().await.unwrap();
concurrent_storage_arc
.stop_background_processor()
.await
.unwrap();
}
#[tokio::test]
async fn test_error_handling_integration() {
let (_temp_dir, base_storage) = integration_utils::create_test_storage();
let concurrent_storage = ConcurrentDocumentTextStorage::new(base_storage, ConcurrentStorageConfig::default());
concurrent_storage
.start_background_processor()
.await
.unwrap();
let invalid_doc_id = DocumentId::new();
let result = concurrent_storage.get_text_concurrent(invalid_doc_id).await;
assert!(result.is_err(), "Should fail to read non-existent document");
let doc_id = DocumentId::new();
let text = "Short text";
concurrent_storage
.store_text_immediate(doc_id, text)
.await
.unwrap();
let result = concurrent_storage
.extract_text_substring_concurrent(doc_id, 0, 1000)
.await;
println!("Invalid substring extraction result: {:?}", result.is_err());
let async_storage = AsyncDocumentTextStorage::new(
DocumentTextStorage::create(TempDir::new().unwrap(), 1024 * 1024).unwrap(),
AsyncStorageConfig {
default_timeout: Duration::from_millis(1), ..AsyncStorageConfig::default()
},
)
.await
.unwrap();
let doc_id = DocumentId::new();
let result = async_storage
.store_text_async(doc_id, "test".to_string())
.await;
println!("Timeout test store result: {:?}", result.is_ok());
async_storage.shutdown().await.unwrap();
concurrent_storage
.stop_background_processor()
.await
.unwrap();
}
#[tokio::test]
async fn test_performance_monitoring_integration() {
let monitor = MonitoringPerformanceMonitor::new();
let start_time = Instant::now();
for i in 0..10 {
let latency = Duration::from_millis(10 + (i % 5));
let success = i % 10 != 9; let bytes = 1024 + (i * 100);
monitor.record_write(latency, bytes, success).await;
}
for i in 0..15 {
let _latency = Duration::from_millis(5 + (i % 3));
let _success = i % 15 != 14;
monitor.increment_operations_counter();
}
for i in 0..20 {
let hit = i % 3 != 0; let lookup_time = Duration::from_nanos(100 + (i as u64 * 10));
monitor
.record_bloom_filter_lookup(hit, lookup_time, false)
.await;
}
for _concurrency in [1, 2, 5, 10] {
monitor.increment_operations_counter();
}
for (_hit, size) in [(true, 1024), (false, 2048), (true, 1024), (true, 4096)] {
monitor.add_bytes_written(size as u64);
}
for i in 0..8 {
let _latency = Duration::from_millis(20 + (i % 4) * 5);
let success = i % 8 != 7;
monitor.increment_operations_counter();
if success {
monitor.increment_successful_writes();
} else {
monitor.increment_failed_writes();
}
}
monitor.increment_operations_counter();
monitor.increment_operations_counter();
let total_duration = start_time.elapsed();
println!("Monitoring integration test completed in: {:?}", total_duration);
assert!(total_duration < Duration::from_secs(5), "Monitoring should be fast");
let stats = monitor.get_detailed_stats().await;
assert!(stats.uptime > Duration::ZERO);
assert!(stats.total_operations > 0);
}
#[tokio::test]
async fn test_component_compatibility() {
let (_temp_dir, base_storage) = integration_utils::create_test_storage();
let _memory_pool = TextMemoryPool::new_default();
let concurrent_storage = ConcurrentDocumentTextStorage::new(base_storage, ConcurrentStorageConfig::default());
concurrent_storage
.start_background_processor()
.await
.unwrap();
let _monitor = MonitoringPerformanceMonitor::new();
let doc_id = DocumentId::new();
let text = "Compatibility test document";
concurrent_storage
.store_text_immediate(doc_id, text)
.await
.unwrap();
let retrieved = concurrent_storage
.get_text_concurrent(doc_id)
.await
.unwrap();
assert_eq!(retrieved, text);
concurrent_storage
.stop_background_processor()
.await
.unwrap();
println!("All performance components are compatible and functional");
}