#[cfg(test)]
mod planner_mod_tests {
use super::super::guard::GuardResult;
use super::super::stall_detection::{RecoveryStrategy, StallEvent, StallType};
use super::super::state_machine::{WorkflowState, WorkflowType};
use super::super::telemetry::ExportFormat;
use super::super::*;
use anyhow::Result;
use std::time::Duration;
use tokio::time::timeout;
use uuid::Uuid;
fn create_test_planner_config() -> PlannerConfig {
PlannerConfig::test()
}
fn create_production_config() -> PlannerConfig {
PlannerConfig::production()
}
fn create_development_config() -> PlannerConfig {
PlannerConfig::development()
}
fn create_simple_test_goal() -> Goal {
Goal::new("Simple test goal")
.with_context("Test environment")
.with_constraints(vec!["No external dependencies".to_string()])
.with_success_criteria(vec!["Task completes successfully".to_string()])
.with_priority(Priority::Medium)
}
fn create_complex_test_goal() -> Goal {
Goal::new("Complex multi-phase system optimization with performance monitoring and security compliance")
.with_context("Production environment with high availability requirements")
.with_constraints(vec![
"Zero downtime".to_string(),
"< 100ms response time".to_string(),
"SOC2 compliance".to_string(),
"Budget limit $50k".to_string(),
])
.with_success_criteria(vec![
"Performance improved by 50%".to_string(),
"Security audit passes".to_string(),
"Zero data loss".to_string(),
"Full rollback capability".to_string(),
])
.with_priority(Priority::High)
}
fn create_security_sensitive_goal() -> Goal {
Goal::new("Deploy authentication system with database encryption")
.with_context("Financial services environment")
.with_constraints(vec![
"GDPR compliance".to_string(),
"Multi-factor authentication".to_string(),
"Audit logging required".to_string(),
])
.with_success_criteria(vec![
"Zero security vulnerabilities".to_string(),
"Compliance certification".to_string(),
])
.with_priority(Priority::Critical)
}
#[tokio::test]
async fn test_planner_controller_creation() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
assert_eq!(controller.config.ai_config.provider, "mock");
Ok(())
}
#[tokio::test]
async fn test_different_configuration_modes() -> Result<()> {
let configs = vec![
("test", create_test_planner_config()),
("development", create_development_config()),
("production", create_production_config()),
];
for (name, config) in configs {
let controller = PlannerExecutorController::new(config.clone()).await?;
match name {
"test" => {
assert_eq!(controller.config.ai_config.provider, "mock");
assert_eq!(
controller
.config
.execution_config
.max_workflow_duration_hours,
1
);
}
"development" => {
assert!(!controller.config.security_config.enable_policy_validation);
assert_eq!(
controller.config.execution_config.max_concurrent_workflows,
2
);
}
"production" => {
assert!(controller.config.security_config.enable_policy_validation);
assert!(controller.config.security_config.enable_security_analysis);
assert_eq!(controller.config.ai_config.provider, "claude");
}
_ => {}
}
}
Ok(())
}
#[tokio::test]
async fn test_controller_with_invalid_configuration() -> Result<()> {
let mut config = create_test_planner_config();
config.ai_config.max_tokens = 0; config.ai_config.timeout_seconds = 0;
let controller = PlannerExecutorController::new(config).await;
assert!(controller.is_ok()); Ok(())
}
#[tokio::test]
async fn test_simple_goal_submission() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller
.submit_goal(goal.clone(), WorkflowType::Simple)
.await?;
assert!(!workflow_id.to_string().is_empty());
tokio::time::sleep(Duration::from_millis(100)).await; let status = controller.get_workflow_status(workflow_id).await?;
assert!(status.is_some());
let workflow_context = status.unwrap();
assert_eq!(workflow_context.goal.id, goal.id);
assert_eq!(workflow_context.workflow_type, WorkflowType::Simple);
Ok(())
}
#[tokio::test]
async fn test_complex_goal_submission() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_complex_test_goal();
let workflow_id = controller
.submit_goal(goal.clone(), WorkflowType::ComplexOrchestration)
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let status = controller.get_workflow_status(workflow_id).await?;
assert!(status.is_some());
let workflow_context = status.unwrap();
assert_eq!(
workflow_context.workflow_type,
WorkflowType::ComplexOrchestration
);
assert!(workflow_context.goal.constraints.len() > 1);
assert!(workflow_context.goal.success_criteria.len() > 1);
Ok(())
}
#[tokio::test]
async fn test_multiple_concurrent_goals() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goals = vec![
create_simple_test_goal(),
create_simple_test_goal(),
create_simple_test_goal(),
];
let mut workflow_ids = Vec::new();
for goal in goals {
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
workflow_ids.push(workflow_id);
}
assert_eq!(workflow_ids.len(), 3);
let unique_ids: std::collections::HashSet<_> = workflow_ids.iter().collect();
assert_eq!(unique_ids.len(), 3);
tokio::time::sleep(Duration::from_secs(2)).await;
let active_workflows = controller.list_active_workflows().await?;
assert!(active_workflows.len() >= 1); Ok(())
}
#[tokio::test]
async fn test_workflow_concurrency_limits() -> Result<()> {
let mut config = create_test_planner_config();
config.execution_config.max_concurrent_workflows = 1; let controller = PlannerExecutorController::new(config).await?;
let goals = vec![create_simple_test_goal(), create_simple_test_goal()];
let workflow_id1 = controller
.submit_goal(goals[0].clone(), WorkflowType::Simple)
.await?;
let workflow_id2 = controller
.submit_goal(goals[1].clone(), WorkflowType::Simple)
.await?;
assert_ne!(workflow_id1, workflow_id2);
tokio::time::sleep(Duration::from_millis(200)).await;
let active_workflows = controller.list_active_workflows().await?;
assert!(active_workflows.len() <= 2);
Ok(())
}
#[tokio::test]
async fn test_workflow_state_transitions() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
let mut previous_state = WorkflowState::Initializing;
let mut state_changes = Vec::new();
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(100)).await;
if let Some(status) = controller.get_workflow_status(workflow_id).await? {
if status.current_state != previous_state {
state_changes.push((previous_state.clone(), status.current_state.clone()));
previous_state = status.current_state.clone();
}
if matches!(
status.current_state,
WorkflowState::Completed | WorkflowState::Failed
) {
break;
}
}
}
assert!(!state_changes.is_empty(), "No state transitions observed");
assert_eq!(state_changes[0].0, WorkflowState::Initializing);
Ok(())
}
#[tokio::test]
async fn test_workflow_completion() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
let completion_timeout = Duration::from_secs(30);
let start_time = tokio::time::Instant::now();
while start_time.elapsed() < completion_timeout {
tokio::time::sleep(Duration::from_millis(200)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if status.is_none() {
break;
}
if let Some(context) = status {
if matches!(
context.current_state,
WorkflowState::Completed | WorkflowState::Failed
) {
assert!(!context.execution_history.is_empty());
break;
}
}
}
Ok(())
}
#[tokio::test]
async fn test_workflow_failure_handling() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let mut problematic_goal = create_simple_test_goal();
problematic_goal.description = "".to_string(); problematic_goal.success_criteria.clear();
let workflow_id = controller
.submit_goal(problematic_goal, WorkflowType::Simple)
.await?;
let timeout_duration = Duration::from_secs(10);
let mut final_state = None;
let result = timeout(timeout_duration, async {
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
if let Some(status) = controller.get_workflow_status(workflow_id).await.unwrap() {
match status.current_state {
WorkflowState::Failed => {
final_state = Some(WorkflowState::Failed);
break;
}
WorkflowState::Completed => {
final_state = Some(WorkflowState::Completed);
break;
}
_ => continue,
}
} else {
break;
}
}
})
.await;
if result.is_err() && final_state.is_none() {
let status = controller.get_workflow_status(workflow_id).await?;
assert!(
status.is_some(),
"Workflow status unavailable after failure handling timeout"
);
}
Ok(())
}
#[tokio::test]
async fn test_security_policy_validation() -> Result<()> {
let mut config = create_test_planner_config();
config.security_config.enable_policy_validation = true;
config.security_config.enable_security_analysis = true;
let controller = PlannerExecutorController::new(config).await?;
let security_goal = create_security_sensitive_goal();
let workflow_id = controller
.submit_goal(security_goal, WorkflowType::ComplexOrchestration)
.await?;
tokio::time::sleep(Duration::from_millis(300)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
assert!(context.workflow_type == WorkflowType::ComplexOrchestration);
}
Ok(())
}
#[tokio::test]
async fn test_capability_restrictions() -> Result<()> {
let mut config = create_test_planner_config();
config.security_config.enable_capability_restrictions = true;
config.security_config.allowed_capabilities =
vec!["fs.read.v1".to_string(), "analysis.system.v1".to_string()];
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let status = controller.get_workflow_status(workflow_id).await?;
assert!(status.is_some()); Ok(())
}
#[tokio::test]
async fn test_execution_timeout_limits() -> Result<()> {
let mut config = create_test_planner_config();
config.security_config.max_execution_time_seconds = 5;
let controller = PlannerExecutorController::new(config).await?;
let goal = create_complex_test_goal();
let workflow_id = controller
.submit_goal(goal, WorkflowType::ComplexOrchestration)
.await?;
tokio::time::sleep(Duration::from_secs(7)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
match context.current_state {
WorkflowState::Executing => {
assert!(context.execution_history.len() > 0);
}
_ => {} }
}
Ok(())
}
#[tokio::test]
async fn test_stall_detection_timeout() -> Result<()> {
let mut config = create_test_planner_config();
config.stall_config.global_timeout_seconds = 2;
let controller = PlannerExecutorController::new(config).await?;
let slow_goal = Goal::new("Complex analysis requiring extended processing time")
.with_context("Performance testing environment")
.with_priority(Priority::Low);
let workflow_id = controller
.submit_goal(slow_goal, WorkflowType::ComplexOrchestration)
.await?;
tokio::time::sleep(Duration::from_secs(5)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
assert!(context.created_at <= context.updated_at);
}
Ok(())
}
#[tokio::test]
async fn test_recovery_strategy_execution() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
assert!(context.execution_history.len() >= 0);
for stall_event in &context.stall_detections {
match stall_event.recovery_strategy {
RecoveryStrategy::AutoRetry => {
}
RecoveryStrategy::UserIntervention => {
}
RecoveryStrategy::Escalate => {
}
RecoveryStrategy::Fail => {
}
}
}
}
Ok(())
}
#[tokio::test]
async fn test_telemetry_collection() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(300)).await;
let telemetry_json = controller.export_telemetry(ExportFormat::Json).await?;
assert!(!telemetry_json.is_empty());
let _parsed: serde_json::Value = serde_json::from_str(&telemetry_json)?;
Ok(())
}
#[tokio::test]
async fn test_metrics_export_formats() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let _workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let formats = vec![ExportFormat::Json, ExportFormat::Prometheus];
for format in formats {
let export = controller.export_telemetry(format.clone()).await?;
match format {
ExportFormat::Json => {
if !export.is_empty() {
let _parsed: serde_json::Value = serde_json::from_str(&export)?;
}
}
ExportFormat::Csv => {
if !export.is_empty() {
assert!(export.contains(','));
}
}
ExportFormat::Prometheus => {
if !export.is_empty() {
assert!(export.contains('#') || export.contains("_total"));
}
}
ExportFormat::NatsStream => {
}
}
}
Ok(())
}
#[tokio::test]
async fn test_workflow_metrics_tracking() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
assert!(context.created_at <= context.updated_at);
assert!(context.execution_history.len() >= 0);
}
Ok(())
}
#[tokio::test]
async fn test_high_concurrency_workflows() -> Result<()> {
let mut config = create_test_planner_config();
config.execution_config.max_concurrent_workflows = 10;
let controller = PlannerExecutorController::new(config).await?;
let mut handles = Vec::new();
for i in 0..20 {
let controller_clone = controller.clone();
let goal = Goal::new(format!("Concurrent goal {}", i));
let handle = tokio::spawn(async move {
controller_clone
.submit_goal(goal, WorkflowType::Simple)
.await
});
handles.push(handle);
}
let results: Result<Vec<_>, _> = futures::future::try_join_all(handles)
.await
.unwrap()
.into_iter()
.collect();
let workflow_ids = results?;
assert_eq!(workflow_ids.len(), 20);
tokio::time::sleep(Duration::from_millis(500)).await;
let active_workflows = controller.list_active_workflows().await?;
assert!(active_workflows.len() <= 20);
Ok(())
}
#[tokio::test]
async fn test_memory_usage_under_load() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
for batch in 0..5 {
let mut batch_workflows = Vec::new();
for i in 0..10 {
let goal = Goal::new(format!("Batch {} Goal {}", batch, i));
let workflow_id = controller.submit_goal(goal, WorkflowType::Simple).await?;
batch_workflows.push(workflow_id);
}
tokio::time::sleep(Duration::from_millis(200)).await;
let active_count = controller.list_active_workflows().await?.len();
assert!(active_count <= 50); }
Ok(())
}
#[tokio::test]
async fn test_complete_workflow_lifecycle() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal = create_simple_test_goal();
let workflow_id = controller
.submit_goal(goal.clone(), WorkflowType::Simple)
.await?;
let mut lifecycle_states = Vec::new();
let timeout_duration = Duration::from_secs(30);
let start_time = tokio::time::Instant::now();
while start_time.elapsed() < timeout_duration {
tokio::time::sleep(Duration::from_millis(100)).await;
if let Some(status) = controller.get_workflow_status(workflow_id).await? {
if lifecycle_states.is_empty()
|| lifecycle_states.last() != Some(&status.current_state)
{
lifecycle_states.push(status.current_state.clone());
}
if matches!(
status.current_state,
WorkflowState::Completed | WorkflowState::Failed
) {
break;
}
} else {
break;
}
}
assert!(lifecycle_states.len() >= 1);
if let Some(first_state) = lifecycle_states.first() {
assert!(matches!(
first_state,
WorkflowState::Initializing | WorkflowState::Planning
));
}
Ok(())
}
#[tokio::test]
async fn test_graceful_shutdown() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let goal1 = create_simple_test_goal();
let goal2 = create_simple_test_goal();
let _workflow_id1 = controller.submit_goal(goal1, WorkflowType::Simple).await?;
let _workflow_id2 = controller.submit_goal(goal2, WorkflowType::Simple).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let shutdown_future = controller.shutdown();
match timeout(Duration::from_secs(20), shutdown_future).await {
Ok(res) => assert!(res.is_ok()),
Err(_) => {
controller.shutdown().await?;
}
}
Ok(())
}
#[tokio::test]
async fn test_error_propagation_and_handling() -> Result<()> {
let config = create_test_planner_config();
let controller = PlannerExecutorController::new(config).await?;
let error_goal = Goal::new("Invalid operation with malformed parameters")
.with_constraints(vec!["Impossible constraint".to_string()])
.with_success_criteria(vec!["Contradictory requirement".to_string()]);
let workflow_id = controller
.submit_goal(error_goal, WorkflowType::Simple)
.await?;
tokio::time::sleep(Duration::from_millis(500)).await;
let status = controller.get_workflow_status(workflow_id).await?;
if let Some(context) = status {
assert!(context.execution_history.len() >= 0);
match context.current_state {
WorkflowState::Failed => {
assert!(!context.execution_history.is_empty());
}
_ => {
}
}
}
Ok(())
}
}
use futures;