use do_memory_core::{
ExecutionResult, ExecutionStep, MemoryConfig, QueueConfig, SelfLearningMemory, TaskContext,
TaskOutcome, TaskType,
};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
fn test_memory_config() -> MemoryConfig {
MemoryConfig {
quality_threshold: 0.0, ..Default::default()
}
}
async fn create_test_episode(
memory: &SelfLearningMemory,
description: &str,
step_count: usize,
) -> uuid::Uuid {
let context = TaskContext::default();
let episode_id = memory
.start_episode(description.to_string(), context, TaskType::Testing)
.await;
for i in 0..step_count {
let mut step = ExecutionStep::new(i + 1, format!("tool_{i}"), "Action".to_string());
step.result = Some(ExecutionResult::Success {
output: "OK".to_string(),
});
step.latency_ms = 100;
memory.log_step(episode_id, step).await;
}
episode_id
}
#[tokio::test]
async fn should_extract_patterns_asynchronously_in_background() {
let memory = SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default());
let memory_arc = Arc::new(memory);
memory_arc.start_workers().await;
let episode_id = create_test_episode(&memory_arc, "Test task", 3).await;
memory_arc
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let stats = memory_arc.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, 1);
sleep(Duration::from_millis(500)).await;
let episode = memory_arc.get_episode(episode_id).await.unwrap();
assert!(episode.is_complete());
assert!(episode.reward.is_some());
assert!(episode.reflection.is_some());
}
#[tokio::test]
async fn should_complete_faster_with_async_extraction_than_sync() {
let sync_memory = SelfLearningMemory::with_config(test_memory_config());
let sync_episode_id = create_test_episode(&sync_memory, "Sync task", 3).await;
let start_sync = std::time::Instant::now();
sync_memory
.complete_episode(
sync_episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let sync_duration = start_sync.elapsed();
let episode_sync = sync_memory.get_episode(sync_episode_id).await.unwrap();
assert!(!episode_sync.patterns.is_empty() || episode_sync.steps.len() < 2);
let async_memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default()),
);
async_memory.start_workers().await;
let async_episode_id = create_test_episode(&async_memory, "Async task", 3).await;
let async_start = std::time::Instant::now();
async_memory
.complete_episode(
async_episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let async_duration = async_start.elapsed();
println!("Sync: {sync_duration:?}, Async: {async_duration:?}");
sleep(Duration::from_millis(500)).await;
let async_episode = async_memory.get_episode(async_episode_id).await.unwrap();
assert!(async_episode.is_complete());
}
#[tokio::test]
async fn should_process_multiple_episodes_in_parallel_with_worker_pool() {
let config = QueueConfig {
worker_count: 4,
..Default::default()
};
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config()).enable_async_extraction(config),
);
memory.start_workers().await;
let episode_count = 10;
let mut episode_ids = Vec::new();
for i in 0..episode_count {
let episode_id = create_test_episode(&memory, &format!("Task {i}"), 3).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: format!("Done {i}"),
artifacts: vec![],
},
)
.await
.unwrap();
episode_ids.push(episode_id);
}
let stats = memory.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, u64::try_from(episode_count).unwrap());
sleep(Duration::from_secs(2)).await;
for episode_id in episode_ids {
let episode = memory.get_episode(episode_id).await.unwrap();
assert!(episode.is_complete());
}
let final_stats = memory.get_queue_stats().await.unwrap();
println!("Final stats: {final_stats:?}");
}
#[tokio::test]
async fn should_handle_backpressure_when_queue_exceeds_capacity() {
let config = QueueConfig {
worker_count: 1,
max_queue_size: 5,
poll_interval_ms: 50,
};
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config()).enable_async_extraction(config),
);
memory.start_workers().await;
for i in 0..10 {
let episode_id = create_test_episode(&memory, &format!("Task {i}"), 2).await;
let result = memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await;
assert!(result.is_ok());
}
let stats = memory.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, 10);
sleep(Duration::from_secs(3)).await;
let final_stats = memory.get_queue_stats().await.unwrap();
assert_eq!(final_stats.current_queue_size, 0);
}
#[tokio::test]
async fn should_recover_from_worker_errors_and_continue_processing() {
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default()),
);
memory.start_workers().await;
let context = TaskContext::default();
let _incomplete_id = memory
.start_episode("Incomplete".to_string(), context, TaskType::Testing)
.await;
let complete_id = create_test_episode(&memory, "Complete task", 3).await;
memory
.complete_episode(
complete_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
sleep(Duration::from_millis(500)).await;
let complete_episode = memory.get_episode(complete_id).await.unwrap();
assert!(complete_episode.is_complete());
let stats = memory.get_queue_stats().await.unwrap();
assert!(stats.total_processed >= 1);
}
#[tokio::test]
#[ignore = "slow integration test - runs for >60s, run explicitly with --ignored"]
async fn should_scale_processing_with_different_worker_counts() {
for worker_count in [1, 2, 4, 8] {
let config = QueueConfig {
worker_count,
poll_interval_ms: 10,
..Default::default()
};
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config()).enable_async_extraction(config),
);
memory.start_workers().await;
let episode_count = 20;
let start = std::time::Instant::now();
for i in 0..episode_count {
let episode_id = create_test_episode(&memory, &format!("Task {i}"), 3).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
sleep(Duration::from_secs(3)).await;
let duration = start.elapsed();
let stats = memory.get_queue_stats().await.unwrap();
println!(
"Workers: {}, Episodes: {}, Duration: {:?}, Processed: {}, Failed: {}",
worker_count, episode_count, duration, stats.total_processed, stats.total_failed
);
assert_eq!(stats.current_queue_size, 0, "Queue should be empty");
}
}
#[tokio::test]
async fn should_track_queue_statistics_accurately() {
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default()),
);
memory.start_workers().await;
let stats = memory.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, 0);
assert_eq!(stats.total_processed, 0);
assert_eq!(stats.current_queue_size, 0);
for i in 0..5 {
let episode_id = create_test_episode(&memory, &format!("Task {i}"), 3).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
}
let stats = memory.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, 5);
sleep(Duration::from_secs(1)).await;
let final_stats = memory.get_queue_stats().await.unwrap();
assert_eq!(final_stats.current_queue_size, 0);
assert!(final_stats.total_processed >= 5 || final_stats.total_failed > 0);
}
#[tokio::test]
async fn should_complete_episodes_in_under_100ms_with_async_extraction() {
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default()),
);
memory.start_workers().await;
let episode_id = create_test_episode(&memory, "Performance test", 5).await;
let start = std::time::Instant::now();
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let duration = start.elapsed();
println!("Episode completion time: {duration:?}");
assert!(
duration.as_millis() < 100u128,
"Episode completion took {duration:?}, expected < 100ms"
);
}
#[tokio::test]
async fn should_work_with_sync_extraction_when_async_disabled() {
let memory = SelfLearningMemory::with_config(test_memory_config());
let episode_id = create_test_episode(&memory, "Sync test", 3).await;
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let stats = memory.get_queue_stats().await;
assert!(stats.is_none());
let episode = memory.get_episode(episode_id).await.unwrap();
assert!(episode.is_complete());
}
#[tokio::test]
async fn should_handle_concurrent_episode_completions_safely() {
let memory = Arc::new(
SelfLearningMemory::with_config(test_memory_config())
.enable_async_extraction(QueueConfig::default()),
);
memory.start_workers().await;
let mut episode_ids = Vec::new();
for i in 0..5 {
let episode_id = create_test_episode(&memory, &format!("Task {i}"), 3).await;
episode_ids.push(episode_id);
}
let mut handles = Vec::new();
for episode_id in episode_ids.clone() {
let mem = Arc::clone(&memory);
let handle = tokio::spawn(async move {
mem.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
});
handles.push(handle);
}
for handle in handles {
handle.await.unwrap().unwrap();
}
let stats = memory.get_queue_stats().await.unwrap();
assert_eq!(stats.total_enqueued, 5);
sleep(Duration::from_secs(1)).await;
for episode_id in episode_ids {
let episode = memory.get_episode(episode_id).await.unwrap();
assert!(episode.is_complete());
}
}