use do_memory_core::memory::SelfLearningMemory;
use do_memory_core::{
ComplexityLevel, ExecutionResult, ExecutionStep, MemoryConfig, TaskContext, TaskOutcome,
TaskType,
};
use std::sync::Arc;
use std::time::{Duration, Instant};
fn setup_test_memory() -> SelfLearningMemory {
let config = MemoryConfig {
quality_threshold: 0.0, ..Default::default()
};
SelfLearningMemory::with_config(config)
}
fn setup_memory_with_config(config: MemoryConfig) -> SelfLearningMemory {
SelfLearningMemory::with_config(config)
}
async fn setup_memory_with_n_episodes(n: usize) -> SelfLearningMemory {
let memory = setup_test_memory();
for i in 0..n {
let context = TaskContext {
language: Some("rust".to_string()),
domain: format!("domain_{}", i % 10),
complexity: match i % 3 {
0 => ComplexityLevel::Simple,
1 => ComplexityLevel::Moderate,
_ => ComplexityLevel::Complex,
},
tags: vec![format!("tag_{}", i % 5)],
..Default::default()
};
let episode_id = memory
.start_episode(format!("Task {i}"), context, TaskType::CodeGeneration)
.await;
let step_count = 3 + (i % 3);
for j in 0..step_count {
let mut step = ExecutionStep::new(j + 1, format!("tool_{j}"), format!("Action {j}"));
step.latency_ms = 10 + (j as u64 * 5);
step.tokens_used = Some(50 + (j * 10));
step.result = Some(ExecutionResult::Success {
output: format!("Step {j} done"),
});
memory.log_step(episode_id, step).await;
}
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: format!("Task {i} completed"),
artifacts: vec![],
},
)
.await
.unwrap();
}
memory
}
fn test_context() -> TaskContext {
TaskContext {
language: Some("rust".to_string()),
framework: Some("tokio".to_string()),
complexity: ComplexityLevel::Moderate,
domain: "testing".to_string(),
tags: vec!["performance".to_string()],
}
}
fn create_test_step(step_number: usize) -> ExecutionStep {
let mut step = ExecutionStep::new(
step_number,
format!("tool_{step_number}"),
format!("Action {step_number}"),
);
step.latency_ms = 10;
step.tokens_used = Some(50);
step.result = Some(ExecutionResult::Success {
output: "Done".to_string(),
});
step
}
#[cfg(target_os = "linux")]
fn get_current_memory_usage() -> usize {
use std::fs;
let status = fs::read_to_string("/proc/self/status").unwrap_or_default();
for line in status.lines() {
if line.starts_with("VmRSS:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<usize>() {
return kb * 1024; }
}
}
}
0
}
#[cfg(not(target_os = "linux"))]
fn get_current_memory_usage() -> usize {
0
}
#[tokio::test]
async fn should_retrieve_episodes_under_100ms_p95_with_100_episodes() {
let memory = setup_memory_with_n_episodes(100).await;
let mut latencies = Vec::new();
for i in 0..100 {
let context = TaskContext {
domain: format!("domain_{}", i % 10),
..Default::default()
};
let start = Instant::now();
let _ = memory
.retrieve_relevant_context(format!("test query {i}"), context.clone(), 10)
.await;
latencies.push(start.elapsed());
}
latencies.sort();
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
let p95_index = ((latencies.len() as f32 * 0.95) as usize).min(latencies.len() - 1);
let p95 = latencies[p95_index];
println!("P95 latency with 100 episodes: {p95:?}");
assert!(
p95.as_millis() < 100,
"P95 retrieval latency {}ms exceeds 100ms target",
p95.as_millis()
);
}
#[tokio::test]
#[ignore = "Long-running test - run with --include-ignored for full validation"]
async fn should_retrieve_episodes_under_100ms_p95_with_10k_episodes() {
let memory = setup_memory_with_n_episodes(10000).await;
let mut latencies = Vec::new();
for i in 0..100 {
let context = TaskContext {
domain: format!("domain_{}", i % 10),
..Default::default()
};
let start = Instant::now();
let _ = memory
.retrieve_relevant_context(format!("test query {i}"), context.clone(), 10)
.await;
latencies.push(start.elapsed());
}
latencies.sort();
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_precision_loss)]
#[allow(clippy::cast_sign_loss)]
let p95_index = ((latencies.len() as f32 * 0.95) as usize).min(latencies.len() - 1);
let p95 = latencies[p95_index];
println!("P95 latency with 10K episodes: {p95:?}");
assert!(
p95.as_millis() < 100,
"P95 retrieval latency {}ms exceeds 100ms target with 10K episodes",
p95.as_millis()
);
}
#[tokio::test]
async fn should_maintain_consistent_retrieval_latency_across_percentiles() {
let memory = setup_memory_with_n_episodes(500).await;
let mut latencies = Vec::new();
for _ in 0..100 {
let start = Instant::now();
memory
.retrieve_relevant_context("query".to_string(), test_context(), 10)
.await;
latencies.push(start.elapsed());
}
latencies.sort();
let p50 = latencies[50];
let p90 = latencies[90];
let p95 = latencies[95];
let p99 = latencies[99];
println!("Retrieval latency percentiles:");
println!(" P50: {p50:?}");
println!(" P90: {p90:?}");
println!(" P95: {p95:?}");
println!(" P99: {p99:?}");
assert!(p95.as_millis() < 100, "P95 should be under 100ms");
}
#[tokio::test]
async fn should_store_1000_episodes_without_performance_degradation() {
let memory = setup_test_memory();
let start = Instant::now();
for i in 0..1000 {
let episode_id = memory
.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await;
let step = create_test_step(1);
memory.log_step(episode_id, step).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
let storage_time = start.elapsed();
println!("Stored 1K episodes in {storage_time:?}");
let retrieval_start = Instant::now();
let results = memory
.retrieve_relevant_context("test".to_string(), test_context(), 10)
.await;
let retrieval_time = retrieval_start.elapsed();
assert!(!results.is_empty());
assert!(
retrieval_time.as_millis() < 100,
"Retrieval degraded to {}ms with 1K episodes",
retrieval_time.as_millis()
);
}
#[tokio::test]
#[ignore = "Long-running test - run with --include-ignored for full validation"]
async fn should_store_10000_episodes_without_performance_degradation() {
let memory = setup_test_memory();
let start = Instant::now();
for i in 0..10000 {
if i % 1000 == 0 {
println!("Progress: {i}/10000 episodes");
}
let episode_id = memory
.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await;
let step = create_test_step(1);
memory.log_step(episode_id, step).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
let storage_time = start.elapsed();
println!("Stored 10K episodes in {storage_time:?}");
let retrieval_start = Instant::now();
let results = memory
.retrieve_relevant_context("test".to_string(), test_context(), 10)
.await;
let retrieval_time = retrieval_start.elapsed();
assert!(!results.is_empty());
assert!(
retrieval_time.as_millis() < 100,
"Retrieval degraded to {}ms with 10K episodes",
retrieval_time.as_millis()
);
}
#[tokio::test]
async fn should_create_episodes_very_quickly() {
let memory = setup_test_memory();
let mut creation_times = Vec::new();
for i in 0..100 {
let start = Instant::now();
memory
.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await;
creation_times.push(start.elapsed());
}
let avg_time: Duration =
creation_times.iter().sum::<Duration>() / u32::try_from(creation_times.len()).unwrap_or(1);
println!("Average episode creation time: {avg_time:?}");
assert!(
avg_time.as_millis() < 10,
"Average creation time {}ms too slow",
avg_time.as_millis()
);
}
#[tokio::test]
#[ignore = "Requires pattern accuracy measurement infrastructure"]
async fn should_achieve_70_percent_pattern_recognition_accuracy() {
}
#[tokio::test]
async fn should_maintain_90_percent_test_coverage() {
#[cfg(not(target_os = "windows"))]
{
let ci_workflow_path = std::env::current_dir()
.unwrap()
.parent()
.unwrap()
.join(".github/workflows/ci-enhanced.yml");
if ci_workflow_path.exists() {
let ci_workflow = std::fs::read_to_string(&ci_workflow_path).unwrap();
assert!(
ci_workflow.contains("cargo-llvm-cov") || ci_workflow.contains("coverage"),
"CI workflow should include coverage reporting"
);
} else {
println!("CI workflow not found at {ci_workflow_path:?}");
}
}
}
#[tokio::test]
async fn should_not_leak_memory_under_continuous_operation() {
let memory = Arc::new(setup_test_memory());
let initial_memory = get_current_memory_usage();
println!("Initial memory: {initial_memory} bytes");
for i in 0..100 {
let mem = memory.clone();
let episode_id = mem
.start_episode(format!("Task {i}"), test_context(), TaskType::Testing)
.await;
for j in 0..5 {
mem.log_step(episode_id, create_test_step(j + 1)).await;
}
mem.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
let final_memory = get_current_memory_usage();
println!("Final memory: {final_memory} bytes");
if initial_memory > 0 {
#[allow(clippy::cast_precision_loss)]
let growth = (final_memory as f32 - initial_memory as f32) / initial_memory as f32;
println!("Memory growth: {:.2}%", growth * 100.0);
assert!(
growth < 2.0, "Memory grew by {:.2}% - possible leak",
growth * 100.0
);
}
}
#[tokio::test]
async fn should_not_leak_memory_over_iterations() {
let memory = Arc::new(setup_test_memory());
let initial_memory = get_current_memory_usage();
for i in 0..100 {
let mem = memory.clone();
let episode_id = mem
.start_episode(format!("Task {i}"), test_context(), TaskType::Testing)
.await;
for j in 0..5 {
mem.log_step(episode_id, create_test_step(j + 1)).await;
}
mem.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
if i % 25 == 0 && i > 0 && initial_memory > 0 {
let current_memory = get_current_memory_usage();
#[allow(clippy::cast_precision_loss)]
let growth = (current_memory as f32 - initial_memory as f32) / initial_memory as f32;
println!("Iteration {}: Memory growth {:.2}%", i, growth * 100.0);
assert!(
growth < 1.0, "Memory grew by {:.2}% after {} iterations - possible leak",
growth * 100.0,
i
);
}
drop(mem);
}
println!("Memory leak test completed successfully over 100 iterations");
}
#[tokio::test]
#[ignore = "slow integration test - run with --ignored or in release CI"]
async fn should_cleanup_cache_when_exceeding_limits() {
let config = MemoryConfig {
storage: do_memory_core::StorageConfig {
max_episodes_cache: 100,
..Default::default()
},
quality_threshold: 0.0, ..Default::default()
};
let memory = setup_memory_with_config(config);
for i in 0..200 {
let episode_id = memory
.start_episode(format!("Task {i}"), test_context(), TaskType::Testing)
.await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
let (total, completed, _) = memory.get_stats().await;
assert_eq!(total, 200);
assert_eq!(completed, 200);
}
#[tokio::test]
async fn should_create_episodes_concurrently_without_conflicts() {
let memory = Arc::new(setup_test_memory());
let start = Instant::now();
let mut handles = vec![];
for i in 0..100 {
let mem = memory.clone();
let handle = tokio::spawn(async move {
mem.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await
});
handles.push(handle);
}
let mut ids = vec![];
for handle in handles {
ids.push(handle.await.unwrap());
}
let elapsed = start.elapsed();
println!(
"Created 100 episodes concurrently in {:?} ({:.2} eps/sec)",
elapsed,
100.0 / elapsed.as_secs_f32()
);
assert_eq!(ids.len(), 100);
assert!(
elapsed.as_secs() < 1,
"Concurrent creation took {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
#[ignore = "slow integration test - run with --ignored or in release CI"]
async fn should_complete_episodes_concurrently_without_conflicts() {
let memory = Arc::new(setup_test_memory());
let mut episode_ids = vec![];
for i in 0..50 {
let id = memory
.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await;
episode_ids.push(id);
}
let start = Instant::now();
let mut handles = vec![];
for episode_id in episode_ids {
let mem = memory.clone();
let handle = tokio::spawn(async move {
mem.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap().unwrap();
}
let elapsed = start.elapsed();
println!("Completed 50 episodes concurrently in {elapsed:?}");
let (_, completed, _) = memory.get_stats().await;
assert_eq!(completed, 50);
}
#[tokio::test]
#[ignore = "slow integration test - run with --ignored or in release CI"]
async fn should_handle_concurrent_retrievals_efficiently() {
let memory = Arc::new(setup_memory_with_n_episodes(100).await);
let start = Instant::now();
let mut handles = vec![];
for i in 0..50 {
let mem = memory.clone();
let handle = tokio::spawn(async move {
mem.retrieve_relevant_context(format!("query {i}"), test_context(), 10)
.await
});
handles.push(handle);
}
for handle in handles {
let results = handle.await.unwrap();
assert!(results.len() <= 10);
}
let elapsed = start.elapsed();
println!("Executed 50 concurrent retrievals in {elapsed:?}");
assert!(
elapsed.as_millis() < 500,
"Concurrent retrievals took {}ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn should_log_steps_very_quickly() {
let memory = setup_test_memory();
let episode_id = memory
.start_episode("Test".to_string(), test_context(), TaskType::Testing)
.await;
let mut step_times = vec![];
for i in 1..=100 {
let step = create_test_step(i);
let start = Instant::now();
memory.log_step(episode_id, step).await;
step_times.push(start.elapsed());
}
let count: u32 = step_times.len().try_into().expect("Test size fits in u32");
let avg_time: Duration = step_times.iter().sum::<Duration>() / count;
println!("Average step logging time: {avg_time:?}");
assert!(
avg_time.as_millis() < 5,
"Step logging too slow: {}ms",
avg_time.as_millis()
);
}
#[tokio::test]
#[ignore = "slow integration test - run with --ignored or in release CI"]
async fn should_complete_episodes_quickly_with_pattern_extraction() {
let memory = setup_test_memory();
let mut completion_times = vec![];
for i in 0..50 {
let episode_id = memory
.start_episode(
format!("Task {i}"),
test_context(),
TaskType::CodeGeneration,
)
.await;
for j in 1..=3 {
memory.log_step(episode_id, create_test_step(j)).await;
}
let start = Instant::now();
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
completion_times.push(start.elapsed());
}
let avg_time: Duration = completion_times.iter().sum::<Duration>()
/ u32::try_from(completion_times.len()).unwrap_or(1);
println!("Average episode completion time: {avg_time:?}");
assert!(
avg_time.as_millis() < 100,
"Episode completion too slow: {}ms",
avg_time.as_millis()
);
}