use crate::common::TestDatabaseManager;
use anyhow::Result;
use codex_memory::{mcp_server::MCPHandlers, Storage};
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_database_connection_recovery() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let id1 = storage
.store(
"Pre-interruption content",
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let retrieved1 = storage
.get(id1)
.await?
.expect("Should retrieve pre-interruption");
assert_eq!(retrieved1.content, "Pre-interruption content");
println!("✅ Normal operation confirmed");
println!("Testing resilience to connection issues...");
let mut recovery_attempts = 0;
let max_attempts = 5;
for attempt in 0..max_attempts {
println!("Recovery attempt #{}", attempt + 1);
let result = timeout(
Duration::from_secs(10),
storage.store(
&format!("Recovery test content attempt {}", attempt + 1),
format!("Context for attempt {}", attempt + 1),
"Test summary".to_string(),
Some(vec!["recovery-test".to_string()]),
),
)
.await;
match result {
Ok(Ok(id)) => {
println!(" ✅ Attempt {} succeeded", attempt + 1);
let retrieved = storage
.get(id)
.await?
.expect("Should retrieve recovery content");
assert_eq!(
retrieved.content,
format!("Recovery test content attempt {}", attempt + 1)
);
recovery_attempts += 1;
break;
}
Ok(Err(e)) => {
println!(" ❌ Attempt {} failed: {}", attempt + 1, e);
tokio::time::sleep(Duration::from_millis(500)).await;
}
Err(_) => {
println!(" ⏰ Attempt {} timed out", attempt + 1);
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
assert!(
recovery_attempts > 0,
"Should eventually recover from connection issues"
);
let id_final = storage
.store(
"Post-recovery content",
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let retrieved_final = storage
.get(id_final)
.await?
.expect("Should retrieve post-recovery");
assert_eq!(retrieved_final.content, "Post-recovery content");
println!("✅ Connection recovery test completed");
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_transaction_rollback_recovery() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let _id1 = storage
.store(
"Initial content",
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let initial_stats = storage.stats().await?;
println!("Initial state: {} memories", initial_stats.total_memories);
let problematic_contents = [
"Large content ".repeat(100000),
"Content with \0 null bytes and control chars \x01\x02".to_string(),
"x".repeat(10_000_000), ];
let mut successful_stores = 0;
let mut failed_stores = 0;
for (i, content) in problematic_contents.iter().enumerate() {
println!(
"Testing problematic content #{} ({} chars)",
i,
content.len()
);
let initial_count = storage.stats().await?.total_memories;
let result = timeout(
Duration::from_secs(30),
storage.store(
content,
format!("Problematic context {}", i),
"Test summary".to_string(),
Some(vec!["problematic".to_string()]),
),
)
.await;
match result {
Ok(Ok(id)) => {
println!(" ✅ Problematic content #{} stored successfully", i);
successful_stores += 1;
let retrieved = storage
.get(id)
.await?
.expect("Should retrieve problematic content");
assert_eq!(retrieved.content.len(), content.len());
let new_count = storage.stats().await?.total_memories;
assert_eq!(new_count, initial_count + 1, "Memory count should increase");
}
Ok(Err(e)) => {
println!(" ❌ Problematic content #{} failed (expected): {}", i, e);
failed_stores += 1;
let final_count = storage.stats().await?.total_memories;
assert_eq!(
final_count, initial_count,
"Failed transaction should not affect memory count"
);
let recovery_id = storage
.store(
&format!("Recovery after failure #{}", i),
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let recovery_content = storage
.get(recovery_id)
.await?
.expect("Should retrieve recovery content");
assert_eq!(
recovery_content.content,
format!("Recovery after failure #{}", i)
);
}
Err(_) => {
println!(" ⏰ Problematic content #{} timed out", i);
failed_stores += 1;
let post_timeout_stats = storage.stats().await?;
assert!(
post_timeout_stats.total_memories >= initial_count,
"Database should remain accessible after timeout"
);
}
}
println!(" Database remains responsive");
}
println!("Transaction rollback test results:");
println!(" Successful stores: {}", successful_stores);
println!(" Failed stores: {}", failed_stores);
let final_stats = storage.stats().await?;
println!(" Final memory count: {}", final_stats.total_memories);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_concurrent_failure_isolation() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
println!("Testing concurrent failure isolation...");
let mut handles = vec![];
for i in 0..20 {
let storage_clone = storage.clone();
let handle = tokio::spawn(async move {
if i % 4 == 0 {
let problematic_content = if i % 8 == 0 {
"x".repeat(5_000_000) } else {
format!("Content with null \0 bytes #{}", i)
};
storage_clone
.store(
&problematic_content,
format!("Problematic context {}", i),
"Test summary".to_string(),
Some(vec!["problematic".to_string()]),
)
.await
} else {
storage_clone
.store(
&format!("Normal content {}", i),
format!("Normal context {}", i),
format!("Normal summary {}", i),
Some(vec!["normal".to_string()]),
)
.await
}
});
handles.push(handle);
}
let mut normal_successes = 0;
let mut normal_failures = 0;
let mut problematic_successes = 0;
let mut problematic_failures = 0;
for (i, handle) in handles.into_iter().enumerate() {
match timeout(Duration::from_secs(30), handle).await {
Ok(Ok(Ok(_))) => {
if i % 4 == 0 {
problematic_successes += 1;
} else {
normal_successes += 1;
}
}
Ok(Ok(Err(e))) => {
if i % 4 == 0 {
println!(" Expected problematic failure #{}: {}", i, e);
problematic_failures += 1;
} else {
println!(" Unexpected normal failure #{}: {}", i, e);
normal_failures += 1;
}
}
Ok(Err(e)) => {
println!(" Task #{} failed: {}", i, e);
}
Err(_) => {
println!(" Operation #{} timed out", i);
}
}
}
println!("Concurrent failure isolation results:");
println!(
" Normal operations: {} succeeded, {} failed",
normal_successes, normal_failures
);
println!(
" Problematic operations: {} succeeded, {} failed",
problematic_successes, problematic_failures
);
assert!(
normal_successes > 12,
"Most normal operations should succeed"
);
assert!(normal_failures < 3, "Few normal operations should fail");
let recovery_id = storage
.store(
"Post-concurrent-test content",
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let recovery_content = storage
.get(recovery_id)
.await?
.expect("Should retrieve recovery content");
assert_eq!(recovery_content.content, "Post-concurrent-test content");
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_mcp_error_handling_recovery() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
let handlers = MCPHandlers::new(storage.clone());
println!("Testing MCP error handling and recovery...");
let valid_params = json!({
"content": "Valid content for baseline",
"context": "Baseline test context",
"summary": "Baseline test summary",
"tags": ["baseline"]
});
let valid_result = handlers
.handle_tool_call("store_memory", valid_params)
.await;
assert!(valid_result.is_ok(), "Baseline request should succeed");
let invalid_requests = vec![
(json!({}), "empty parameters"),
(json!({"content": null}), "null content"),
(
json!({"content": 123, "tags": "not-array"}),
"wrong parameter types",
),
(json!({"invalid": "structure"}), "invalid structure"),
(
json!({
"content": "x".repeat(1_000_000),
"context": "y".repeat(500_000)
}),
"oversized parameters",
),
];
let mut error_recovery_count = 0;
for (invalid_params, test_name) in invalid_requests {
println!("Testing invalid request: {}", test_name);
let invalid_result = handlers
.handle_tool_call("store_memory", invalid_params)
.await;
match invalid_result {
Ok(response) => {
println!(" Unexpected success for {}: {:?}", test_name, response);
}
Err(e) => {
println!(" Expected failure for {}: {}", test_name, e);
let error_msg = e.to_string().to_lowercase();
assert!(
error_msg.contains("missing")
|| error_msg.contains("invalid")
|| error_msg.contains("type")
|| error_msg.contains("parameter")
|| error_msg.contains("too large"),
"Error should be descriptive: {}",
e
);
}
}
let recovery_params = json!({
"content": format!("Recovery test after {}", test_name),
"context": "Recovery test context",
"summary": "Recovery test summary",
"tags": ["recovery"]
});
let recovery_result = handlers
.handle_tool_call("store_memory", recovery_params)
.await;
match recovery_result {
Ok(response) => {
println!(" ✅ Recovery successful after {}", test_name);
error_recovery_count += 1;
if let Some(id) = response["id"].as_str() {
let retrieved = storage
.get(uuid::Uuid::parse_str(id)?)
.await?
.expect("Should retrieve recovery content");
assert_eq!(
retrieved.content,
format!("Recovery test after {}", test_name)
);
}
}
Err(e) => {
println!(" ❌ Recovery failed after {}: {}", test_name, e);
}
}
}
let total_invalid_requests = 5; println!(
"MCP error recovery results: {}/{} recoveries successful",
error_recovery_count, total_invalid_requests
);
assert_eq!(
error_recovery_count, total_invalid_requests,
"Should recover from all error conditions"
);
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_partial_failure_handling() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
println!("Testing partial failure handling...");
let large_content = "x".repeat(10_000_000);
let mixed_content = vec![
("Good content 1", true),
("Content with \0 null", false), ("Good content 2", true),
(large_content.as_str(), false), ("Good content 3", true),
("Content with quotes \" and ' marks", true), ];
let mut handles = vec![];
for (i, (content, should_succeed)) in mixed_content.into_iter().enumerate() {
let storage_clone = storage.clone();
let content_string = content.to_string();
let handle = tokio::spawn(async move {
(
i,
storage_clone
.store(
&content_string,
format!("Context for mixed content {}", i),
"Test summary".to_string(),
Some(vec!["mixed-batch".to_string()]),
)
.await,
should_succeed,
)
});
handles.push(handle);
}
let mut expected_successes = 0;
let mut unexpected_failures = 0;
let mut expected_failures = 0;
let mut unexpected_successes = 0;
for handle in handles {
match timeout(Duration::from_secs(30), handle).await {
Ok(Ok((i, Ok(_), should_succeed))) => {
if should_succeed {
expected_successes += 1;
println!(" ✅ Content {} succeeded as expected", i);
} else {
unexpected_successes += 1;
println!(" ⚠️ Content {} succeeded unexpectedly", i);
}
}
Ok(Ok((i, Err(e), should_succeed))) => {
if should_succeed {
unexpected_failures += 1;
println!(" ❌ Content {} failed unexpectedly: {}", i, e);
} else {
expected_failures += 1;
println!(" ✅ Content {} failed as expected: {}", i, e);
}
}
Ok(Err(e)) => {
println!(" ❌ Task failed: {}", e);
unexpected_failures += 1;
}
Err(_) => {
println!(" ⏰ Operation timed out");
unexpected_failures += 1;
}
}
}
println!("Partial failure handling results:");
println!(" Expected successes: {}", expected_successes);
println!(" Expected failures: {}", expected_failures);
println!(" Unexpected successes: {}", unexpected_successes);
println!(" Unexpected failures: {}", unexpected_failures);
let final_stats = storage.stats().await?;
let successful_items = expected_successes + unexpected_successes;
assert!(
final_stats.total_memories >= successful_items as i64,
"Database should contain successful operations"
);
let recovery_id = storage
.store(
"Post-partial-failure test",
"Test context".to_string(),
"Test summary".to_string(),
None,
)
.await?;
let recovery_content = storage
.get(recovery_id)
.await?
.expect("Should retrieve recovery");
assert_eq!(recovery_content.content, "Post-partial-failure test");
manager.cleanup().await?;
Ok(())
}
#[tokio::test]
async fn test_resource_exhaustion_recovery() -> Result<()> {
let mut manager = TestDatabaseManager::new()?;
let pool = manager.setup_test_database().await?;
let storage = Arc::new(Storage::new(pool));
println!("Testing resource exhaustion recovery...");
let resource_stress_count = 50;
let large_content = "Resource exhaustion test content ".repeat(10000);
let mut handles = vec![];
for i in 0..resource_stress_count {
let storage_clone = storage.clone();
let content_clone = large_content.clone();
let handle = tokio::spawn(async move {
storage_clone
.store(
&content_clone,
format!("Resource stress context {}", i),
"Test summary".to_string(),
Some(vec!["resource-stress".to_string()]),
)
.await
});
handles.push(handle);
}
let mut completed = 0;
let mut failed = 0;
let mut timed_out = 0;
for handle in handles {
match timeout(Duration::from_secs(20), handle).await {
Ok(Ok(Ok(_))) => completed += 1,
Ok(Ok(Err(e))) => {
if e.to_string().to_lowercase().contains("connection")
|| e.to_string().to_lowercase().contains("timeout")
|| e.to_string().to_lowercase().contains("resource")
{
println!(" Expected resource exhaustion: {}", e);
}
failed += 1;
}
Ok(Err(e)) => {
println!(" Task failed: {}", e);
failed += 1;
}
Err(_) => {
timed_out += 1;
}
}
}
println!("Resource exhaustion results:");
println!(" Completed: {}", completed);
println!(" Failed: {}", failed);
println!(" Timed out: {}", timed_out);
println!("Waiting for resource recovery...");
tokio::time::sleep(Duration::from_secs(5)).await;
let mut recovery_successes = 0;
for i in 0..5 {
let result = timeout(
Duration::from_secs(10),
storage.store(
&format!("Recovery test {}", i),
"Test context".to_string(),
"Test summary".to_string(),
None,
),
)
.await;
match result {
Ok(Ok(_)) => {
recovery_successes += 1;
println!(" ✅ Recovery test {} succeeded", i);
}
Ok(Err(e)) => {
println!(" ❌ Recovery test {} failed: {}", i, e);
}
Err(_) => {
println!(" ⏰ Recovery test {} timed out", i);
}
}
}
println!(
"Recovery test results: {}/5 operations succeeded",
recovery_successes
);
assert!(
recovery_successes >= 3,
"Should recover from resource exhaustion"
);
manager.cleanup().await?;
Ok(())
}