#[cfg(test)]
#[allow(clippy::module_inception)]
mod tests {
use crate::episode::ExecutionStep;
use crate::extraction::PatternExtractor;
use crate::learning::queue::operations::PatternExtractionQueue;
use crate::learning::queue::types::QueueConfig;
use crate::memory::SelfLearningMemory;
use crate::types::{ExecutionResult, TaskContext, TaskOutcome, TaskType};
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[tokio::test]
async fn test_queue_creation() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig::default();
let queue = PatternExtractionQueue::new(config, memory);
let size = queue.queue_size().await;
assert_eq!(size, 0);
let stats = queue.get_stats().await;
assert_eq!(stats.total_enqueued, 0);
assert_eq!(stats.total_processed, 0);
}
#[tokio::test]
async fn test_enqueue_episode() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig::default();
let queue = PatternExtractionQueue::new(config, memory);
let episode_id = Uuid::new_v4();
queue.enqueue_episode(episode_id).await.unwrap();
let size = queue.queue_size().await;
assert_eq!(size, 1);
let stats = queue.get_stats().await;
assert_eq!(stats.total_enqueued, 1);
assert_eq!(stats.current_queue_size, 1);
}
#[tokio::test]
async fn test_multiple_enqueue() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig::default();
let queue = PatternExtractionQueue::new(config, memory);
for _ in 0..10 {
let episode_id = Uuid::new_v4();
queue.enqueue_episode(episode_id).await.unwrap();
}
let size = queue.queue_size().await;
assert_eq!(size, 10);
let stats = queue.get_stats().await;
assert_eq!(stats.total_enqueued, 10);
}
#[tokio::test]
async fn test_backpressure_warning() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig {
max_queue_size: 5,
..Default::default()
};
let queue = PatternExtractionQueue::new(config, memory);
for _ in 0..10 {
let episode_id = Uuid::new_v4();
let result = queue.enqueue_episode(episode_id).await;
assert!(result.is_ok());
}
let size = queue.queue_size().await;
assert_eq!(size, 10);
}
#[tokio::test]
async fn test_worker_pool_startup() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig {
worker_count: 2,
..Default::default()
};
let queue = PatternExtractionQueue::new(config, memory);
queue.start_workers().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = queue.get_stats().await;
assert_eq!(stats.active_workers, 2);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_worker_processes_episodes() {
use crate::types::MemoryConfig;
let test_config = MemoryConfig {
quality_threshold: 0.5,
pattern_extraction_threshold: 1.0, enable_summarization: false, enable_embeddings: false, ..Default::default()
};
let memory = Arc::new(SelfLearningMemory::with_config(test_config));
let context = TaskContext::default();
let episode_id = memory
.start_episode("Test task".to_string(), context, TaskType::Testing)
.await;
for i in 0..20 {
let mut step =
ExecutionStep::new(i + 1, format!("tool_{}", i % 6), format!("Action {i}"));
step.result = Some(ExecutionResult::Success {
output: "OK".to_string(),
});
memory.log_step(episode_id, step).await;
}
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
let config = QueueConfig {
worker_count: 1,
poll_interval_ms: 50,
..Default::default()
};
let queue = PatternExtractionQueue::new(config, memory.clone());
queue.start_workers().await;
queue.enqueue_episode(episode_id).await.unwrap();
let emptied = queue.wait_until_empty(Duration::from_secs(2)).await;
assert!(emptied, "Queue should be empty after processing");
let stats = queue.get_stats().await;
assert_eq!(stats.total_enqueued, 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_parallel_processing() {
use crate::types::MemoryConfig;
let test_config = MemoryConfig {
quality_threshold: 0.5,
pattern_extraction_threshold: 1.0, enable_summarization: false, enable_embeddings: false, ..Default::default()
};
let memory = Arc::new(SelfLearningMemory::with_config(test_config));
let mut episode_ids = Vec::new();
for i in 0..5 {
let context = TaskContext::default();
let episode_id = memory
.start_episode(format!("Task {i}"), context, TaskType::Testing)
.await;
for j in 0..20 {
let mut step =
ExecutionStep::new(j + 1, format!("tool_{}", j % 6), format!("Action {j}"));
step.result = Some(ExecutionResult::Success {
output: "OK".to_string(),
});
memory.log_step(episode_id, step).await;
}
memory
.complete_episode(
episode_id,
TaskOutcome::Success {
verdict: "Done".to_string(),
artifacts: vec![],
},
)
.await
.unwrap();
episode_ids.push(episode_id);
}
let config = QueueConfig {
worker_count: 3,
poll_interval_ms: 50,
..Default::default()
};
let queue = PatternExtractionQueue::new(config, memory);
queue.start_workers().await;
for episode_id in episode_ids {
queue.enqueue_episode(episode_id).await.unwrap();
}
let emptied = queue.wait_until_empty(Duration::from_secs(30)).await;
assert!(emptied, "All episodes should be processed");
let stats = queue.get_stats().await;
assert_eq!(stats.total_enqueued, 5);
}
#[tokio::test]
async fn test_graceful_shutdown() {
let memory = Arc::new(SelfLearningMemory::new());
let config = QueueConfig::default();
let queue = PatternExtractionQueue::new(config, memory);
queue.start_workers().await;
tokio::time::sleep(Duration::from_millis(50)).await;
queue.shutdown().await;
}
#[tokio::test]
async fn test_extract_from_nonexistent_episode() {
let memory = Arc::new(SelfLearningMemory::new());
let extractor = PatternExtractor::new();
let fake_id = Uuid::new_v4();
let result =
PatternExtractionQueue::extract_patterns_for_episode(&memory, &extractor, fake_id)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_extract_from_incomplete_episode() {
let memory = Arc::new(SelfLearningMemory::new());
let context = TaskContext::default();
let episode_id = memory
.start_episode("Incomplete".to_string(), context, TaskType::Testing)
.await;
let extractor = PatternExtractor::new();
let result =
PatternExtractionQueue::extract_patterns_for_episode(&memory, &extractor, episode_id)
.await;
assert!(result.is_err());
}
}