use crate::common::TestDatabaseManager;
use anyhow::Result;
use codex_memory::{mcp_server::MCPHandlers, Storage};
use serde_json::json;
use serial_test::serial;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::timeout;
#[tokio::test]
#[serial]
async fn test_high_throughput_storage() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let num_items = 1000;
let content_base = "High throughput test content item ";
println!("Testing storage of {} items...", num_items);
let start = Instant::now();
let mut handles = vec![];
for i in 0..num_items {
let storage_clone = storage.clone();
let content = format!("{}{}", content_base, i);
let handle = tokio::spawn(async move {
storage_clone
.store(
&content,
format!("Context for item {}", i),
format!("Summary of item {}", i),
Some(vec![format!("tag-{}", i), "stress-test".to_string()]),
)
.await
});
handles.push(handle);
}
let mut successes = 0;
let mut failures = 0;
for handle in handles {
match handle.await {
Ok(Ok(_)) => successes += 1,
Ok(Err(e)) => {
println!("Storage failed: {}", e);
failures += 1;
}
Err(e) => {
println!("Task failed: {}", e);
failures += 1;
}
}
}
let duration = start.elapsed();
let throughput = successes as f64 / duration.as_secs_f64();
println!("High throughput test results:");
println!(" Duration: {:?}", duration);
println!(" Successes: {}/{}", successes, num_items);
println!(" Failures: {}", failures);
println!(" Throughput: {:.2} items/second", throughput);
assert!(
successes > (num_items * 90 / 100),
"At least 90% should succeed"
);
assert!(throughput > 10.0, "Should achieve at least 10 items/second");
assert!(
duration < Duration::from_secs(120),
"Should complete within 2 minutes"
);
let stats = storage.stats().await?;
assert!(
stats.total_memories >= successes as i64,
"All successful items should be stored"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
#[serial]
async fn test_memory_usage_under_load() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let mut base_memory = get_memory_usage();
println!("Initial memory usage: {} bytes", base_memory);
let content_sizes = vec![1024, 10_240, 102_400, 1_024_000]; let items_per_size = 100;
for size in content_sizes {
println!("Testing {} items of {} bytes each", items_per_size, size);
let content = "x".repeat(size);
let mut handles = vec![];
let size_start = Instant::now();
for i in 0..items_per_size {
let storage_clone = storage.clone();
let content_clone = content.clone();
let handle = tokio::spawn(async move {
storage_clone
.store(
&content_clone,
format!("Context for {}B item #{}", size, i),
"Test summary".to_string(),
Some(vec![format!("size-{}", size)]),
)
.await
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
let size_duration = size_start.elapsed();
let current_memory = get_memory_usage();
let memory_increase = current_memory.saturating_sub(base_memory);
println!(" Duration: {:?}", size_duration);
println!(" Memory increase: {} bytes", memory_increase);
if memory_increase > 0 {
println!(
" Memory per item: {} bytes",
memory_increase / items_per_size as u64
);
} else {
println!(" Memory per item: 0 bytes (memory decreased or unchanged)");
}
let expected_data = size * items_per_size;
if memory_increase > 0 {
let memory_overhead_ratio = memory_increase as f64 / expected_data as f64;
println!(" Memory overhead ratio: {:.2}x", memory_overhead_ratio);
let threshold = if size <= 1_024 {
2000.0
} else if size <= 10_240 {
200.0
} else {
100.0
};
assert!(
memory_overhead_ratio < threshold,
"Memory overhead should not exceed {}x data size (was {:.2}x)",
threshold,
memory_overhead_ratio
);
} else {
println!(" Memory overhead ratio: N/A (memory decreased)");
}
base_memory = current_memory;
}
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_concurrent_read_write_performance() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let mut stored_ids = vec![];
for i in 0..100 {
let id = storage
.store(
&format!("Pre-stored content {}", i),
format!("Context {}", i),
format!("Summary {}", i),
Some(vec![format!("prestored-{}", i)]),
)
.await?;
stored_ids.push(id);
}
println!("Testing concurrent read/write operations...");
let start = Instant::now();
let mut write_handles = vec![];
let mut read_handles = vec![];
for i in 0..50 {
let storage_clone = storage.clone();
let handle = tokio::spawn(async move {
storage_clone
.store(
&format!("Concurrent write {}", i),
format!("Write context {}", i),
"Test summary".to_string(),
Some(vec!["concurrent-write".to_string()]),
)
.await
});
write_handles.push(handle);
}
for i in 0..50 {
let storage_clone = storage.clone();
let ids_clone = stored_ids.clone();
let handle = tokio::spawn(async move {
let id = ids_clone[i % ids_clone.len()];
storage_clone.get(id).await
});
read_handles.push(handle);
}
let mut write_successes = 0;
let mut read_successes = 0;
let mut failures = 0;
for handle in write_handles {
match handle.await {
Ok(Ok(_)) => write_successes += 1,
Ok(Err(e)) => {
println!("Write operation failed: {}", e);
failures += 1;
}
Err(e) => {
println!("Write task failed: {}", e);
failures += 1;
}
}
}
for handle in read_handles {
match handle.await {
Ok(Ok(_)) => read_successes += 1,
Ok(Err(e)) => {
println!("Read operation failed: {}", e);
failures += 1;
}
Err(e) => {
println!("Read task failed: {}", e);
failures += 1;
}
}
}
let duration = start.elapsed();
let total_ops = write_successes + read_successes;
let ops_per_second = total_ops as f64 / duration.as_secs_f64();
println!("Concurrent R/W test results:");
println!(" Duration: {:?}", duration);
println!(" Write successes: {}/50", write_successes);
println!(" Read successes: {}/50", read_successes);
println!(" Failures: {}", failures);
println!(" Operations per second: {:.2}", ops_per_second);
assert!(write_successes > 45, "Most writes should succeed");
assert!(read_successes > 45, "Most reads should succeed");
assert!(
ops_per_second > 20.0,
"Should achieve reasonable throughput"
);
assert!(
duration < Duration::from_secs(30),
"Should complete quickly"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_database_connection_pool_stress() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
println!("Stress testing database connection pool...");
let concurrent_operations = 100;
let mut handles = vec![];
let start = Instant::now();
for i in 0..concurrent_operations {
let storage_clone = storage.clone();
let handle = tokio::spawn(async move {
match i % 4 {
0 => {
storage_clone
.store(
&format!("Connection stress test {}", i),
"Test context".to_string(),
"Test summary".to_string(),
Some(vec!["connection-stress".to_string()]),
)
.await
}
1 => {
storage_clone.stats().await.map(|_| uuid::Uuid::new_v4())
}
2 => {
storage_clone.stats().await.map(|_| uuid::Uuid::new_v4())
}
3 => {
for _ in 0..3 {
let _ = storage_clone.stats().await;
}
Ok(uuid::Uuid::new_v4())
}
_ => unreachable!(),
}
});
handles.push(handle);
}
let mut successes = 0;
let mut failures = 0;
let mut timeouts = 0;
for handle in handles {
match timeout(Duration::from_secs(30), handle).await {
Ok(Ok(Ok(_))) => successes += 1,
Ok(Ok(Err(e))) => {
println!("Operation failed: {}", e);
failures += 1;
}
Ok(Err(e)) => {
println!("Task failed: {}", e);
failures += 1;
}
Err(_) => {
println!("Operation timed out");
timeouts += 1;
}
}
}
let duration = start.elapsed();
println!("Connection pool stress test results:");
println!(" Duration: {:?}", duration);
println!(" Successes: {}/{}", successes, concurrent_operations);
println!(" Failures: {}", failures);
println!(" Timeouts: {}", timeouts);
assert!(
successes > (concurrent_operations * 80 / 100),
"At least 80% of operations should succeed"
);
assert!(
timeouts < (concurrent_operations * 20 / 100),
"Should not have excessive timeouts"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_mcp_handler_performance() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let handlers = Arc::new(MCPHandlers::new(storage));
println!("Testing MCP handler performance...");
let num_requests = 200;
let mut handles = vec![];
let start = Instant::now();
for i in 0..num_requests {
let handlers_clone = handlers.clone();
let handle = tokio::spawn(async move {
let params = json!({
"content": format!("MCP performance test content {}", i),
"context": format!("Performance test context {}", i),
"summary": format!("Performance test summary {}", i),
"tags": [format!("mcp-perf-{}", i), "performance"]
});
handlers_clone
.handle_tool_call("store_memory", params)
.await
});
handles.push(handle);
}
let mut successes = 0;
let mut failures = 0;
for handle in handles {
match timeout(Duration::from_secs(60), handle).await {
Ok(Ok(Ok(_))) => successes += 1,
Ok(Ok(Err(e))) => {
println!("MCP request failed: {}", e);
failures += 1;
}
Ok(Err(e)) => {
println!("Task failed: {}", e);
failures += 1;
}
Err(_) => {
println!("MCP request timed out");
failures += 1;
}
}
}
let duration = start.elapsed();
let requests_per_second = successes as f64 / duration.as_secs_f64();
println!("MCP handler performance results:");
println!(" Duration: {:?}", duration);
println!(" Successes: {}/{}", successes, num_requests);
println!(" Failures: {}", failures);
println!(" Requests per second: {:.2}", requests_per_second);
assert!(
successes > (num_requests * 85 / 100),
"Most MCP requests should succeed"
);
assert!(
requests_per_second > 5.0,
"Should handle reasonable request rate"
);
assert!(
duration < Duration::from_secs(120),
"Should complete within reasonable time"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_memory_deduplication_performance() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
println!("Testing memory deduplication performance...");
let duplicate_content = "This content will be stored multiple times to test deduplication performance and efficiency under various load conditions.";
let num_duplicates = 500;
let start = Instant::now();
let mut handles = vec![];
for i in 0..num_duplicates {
let storage_clone = storage.clone();
let content = duplicate_content.to_string();
let handle = tokio::spawn(async move {
storage_clone
.store(
&content,
format!("Duplicate test context {}", i),
format!("Duplicate test summary {}", i),
Some(vec![format!("dup-{}", i)]),
)
.await
});
handles.push(handle);
}
let mut first_id = None;
let mut successes = 0;
let mut dedup_count = 0;
for handle in handles {
match handle.await {
Ok(Ok(id)) => {
successes += 1;
if let Some(first) = first_id {
if first == id {
dedup_count += 1;
}
} else {
first_id = Some(id);
}
}
Ok(Err(e)) => println!("Duplicate store failed: {}", e),
Err(e) => println!("Task failed: {}", e),
}
}
let duration = start.elapsed();
let dedup_ratio = dedup_count as f64 / successes as f64;
println!("Deduplication performance results:");
println!(" Duration: {:?}", duration);
println!(" Successes: {}/{}", successes, num_duplicates);
println!(" Deduplication hits: {}", dedup_count);
println!(" Deduplication ratio: {:.2}%", dedup_ratio * 100.0);
assert!(
dedup_ratio > 0.95,
"Should achieve high deduplication ratio"
);
assert!(
duration < Duration::from_secs(60),
"Deduplication should be fast"
);
let stats = storage.stats().await?;
assert_eq!(
stats.total_memories, 1,
"Should only have one deduplicated record"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_large_content_processing_performance() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let base_content = "This is test content that will be repeated to create large payloads for performance testing. ";
let sizes = vec![
(100, "Small"), (1000, "Medium"), (10000, "Large"), (50000, "XLarge"), ];
for (multiplier, size_name) in sizes {
let content = base_content.repeat(multiplier);
println!("Testing {} content ({} chars)...", size_name, content.len());
let start = Instant::now();
let result = storage
.store(
&content,
format!("{} content context", size_name),
format!("{} content summary", size_name),
Some(vec![format!("size-{}", size_name.to_lowercase())]),
)
.await;
let store_duration = start.elapsed();
match result {
Ok(id) => {
println!(" Store time: {:?}", store_duration);
let retrieve_start = Instant::now();
let retrieved = storage.get(id).await?;
let retrieve_duration = retrieve_start.elapsed();
println!(" Retrieve time: {:?}", retrieve_duration);
assert!(retrieved.is_some(), "Should retrieve large content");
assert_eq!(retrieved.unwrap().content.len(), content.len());
let max_store_time = Duration::from_secs(30);
let max_retrieve_time = Duration::from_secs(10);
assert!(
store_duration < max_store_time,
"{} content store should complete within {:?}",
size_name,
max_store_time
);
assert!(
retrieve_duration < max_retrieve_time,
"{} content retrieval should complete within {:?}",
size_name,
max_retrieve_time
);
}
Err(e) => {
println!(" {} content failed: {}", size_name, e);
if multiplier < 10000 {
panic!("{} content should not fail: {}", size_name, e);
}
}
}
println!();
}
manager.cleanup().await?;
Ok(())
}
fn get_memory_usage() -> u64 {
std::process::Command::new("ps")
.args(["-o", "rss=", "-p", &std::process::id().to_string()])
.output()
.ok()
.and_then(|output| String::from_utf8(output.stdout).ok())
.and_then(|s| s.trim().parse::<u64>().ok())
.unwrap_or(0)
* 1024 }