#[actix_rt::test]
async fn test_executor_creation() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry);
assert!(executor.monitor.is_none());
}
#[test]
fn test_backoff_calculation() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry);
let fixed = BackoffStrategy::Fixed {
delay: Duration::from_secs(5),
};
assert_eq!(
executor.calculate_backoff(&fixed, 1),
Duration::from_secs(5)
);
assert_eq!(
executor.calculate_backoff(&fixed, 3),
Duration::from_secs(5)
);
let exp = BackoffStrategy::Exponential {
initial: Duration::from_secs(1),
multiplier: 2.0,
max: Duration::from_secs(10),
};
assert_eq!(executor.calculate_backoff(&exp, 1), Duration::from_secs(1));
assert_eq!(executor.calculate_backoff(&exp, 2), Duration::from_secs(2));
assert_eq!(executor.calculate_backoff(&exp, 3), Duration::from_secs(4));
let linear = BackoffStrategy::Linear {
initial: Duration::from_secs(1),
increment: Duration::from_secs(2),
};
assert_eq!(
executor.calculate_backoff(&linear, 1),
Duration::from_secs(1)
);
assert_eq!(
executor.calculate_backoff(&linear, 2),
Duration::from_secs(3)
);
assert_eq!(
executor.calculate_backoff(&linear, 3),
Duration::from_secs(5)
);
}
#[actix_rt::test]
async fn test_dag_integration_parallel_execution() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry.clone());
let workflow = WorkflowBuilder::new("parallel_test")
.add_step(StepBuilder::action("step1", "Init", "agent1", "init").build())
.add_step(StepBuilder::action("step2", "Process A", "agent2", "process").build())
.add_step(StepBuilder::action("step3", "Process B", "agent3", "process").build())
.build();
let context = WorkflowContext::new(workflow.id, registry);
let result = executor.execute(&workflow, &context).await;
assert!(result.is_err()); match result {
Err(WorkflowError::AgentError(_)) => {
}
other => panic!("Expected AgentError, got: {:?}", other),
}
}
#[actix_rt::test]
async fn test_pause_resume_workflow() {
let registry = Arc::new(AgentRegistry::new());
let executor = Arc::new(DefaultWorkflowExecutor::new(registry.clone()));
let workflow = WorkflowBuilder::new("pause_test")
.add_step(StepBuilder::action("step1", "Step 1", "agent1", "op1").build())
.add_step(StepBuilder::action("step2", "Step 2", "agent2", "op2").build())
.build();
let context = WorkflowContext::new(workflow.id, registry);
let _execution_id = context.execution_id;
let executor_clone = executor.clone();
let workflow_clone = workflow.clone();
let context_clone = WorkflowContext::new(workflow_clone.id, context.agent_registry.clone());
let context_clone_id = context_clone.execution_id;
tokio::spawn(async move {
let _ = executor_clone
.execute(&workflow_clone, &context_clone)
.await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let pause_result = executor.pause(context_clone_id).await;
assert!(pause_result.is_ok());
let control = executor
.check_execution_control(context_clone_id)
.expect("internal error");
assert_eq!(control, ExecutionControl::Paused);
let resume_result = executor.resume(context_clone_id).await;
assert!(resume_result.is_ok());
}
#[actix_rt::test]
async fn test_cancel_workflow() {
let registry = Arc::new(AgentRegistry::new());
let executor = Arc::new(DefaultWorkflowExecutor::new(registry.clone()));
let workflow = WorkflowBuilder::new("cancel_test")
.add_step(StepBuilder::action("step1", "Step 1", "agent1", "op1").build())
.build();
let context = WorkflowContext::new(workflow.id, registry);
let _execution_id = context.execution_id;
let executor_clone = executor.clone();
let workflow_clone = workflow.clone();
let context_clone = WorkflowContext::new(workflow_clone.id, context.agent_registry.clone());
let context_clone_id = context_clone.execution_id;
tokio::spawn(async move {
let _ = executor_clone
.execute(&workflow_clone, &context_clone)
.await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let cancel_result = executor.cancel(context_clone_id).await;
assert!(cancel_result.is_ok());
let control = executor
.check_execution_control(context_clone_id)
.expect("internal error");
assert_eq!(control, ExecutionControl::Cancelled);
}
#[actix_rt::test]
async fn test_workflow_with_retry() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry.clone());
let workflow = WorkflowBuilder::new("retry_test")
.add_step(
StepBuilder::action("step1", "Flaky Step", "agent1", "flaky_op")
.retry(
3,
BackoffStrategy::Exponential {
initial: Duration::from_millis(10),
multiplier: 2.0,
max: Duration::from_millis(100),
},
)
.build(),
)
.build();
let context = WorkflowContext::new(workflow.id, registry);
let result = executor.execute(&workflow, &context).await;
assert!(result.is_err());
match result {
Err(WorkflowError::AgentError(_)) => {
}
other => panic!("Expected AgentError, got: {:?}", other),
}
}
#[actix_rt::test]
async fn test_workflow_error_strategies() {
let registry = Arc::new(AgentRegistry::new());
let executor_continue = DefaultWorkflowExecutor::new(registry.clone());
let workflow_continue = WorkflowBuilder::new("continue_test")
.error_strategy(ErrorStrategy::Continue)
.add_step(StepBuilder::action("step1", "Step 1", "nonexistent_agent", "op1").build())
.build();
let context_continue = WorkflowContext::new(workflow_continue.id, registry.clone());
let result_continue = executor_continue
.execute(&workflow_continue, &context_continue)
.await;
assert!(result_continue.is_ok());
let executor_failfast = DefaultWorkflowExecutor::new(registry.clone());
let workflow_failfast = WorkflowBuilder::new("failfast_test")
.error_strategy(ErrorStrategy::FailFast)
.add_step(StepBuilder::action("step1", "Step 1", "nonexistent_agent", "op1").build())
.build();
let context_failfast = WorkflowContext::new(workflow_failfast.id, registry);
let result_failfast = executor_failfast
.execute(&workflow_failfast, &context_failfast)
.await;
assert!(result_failfast.is_err());
}
#[actix_rt::test]
async fn test_workflow_monitoring() {
let registry = Arc::new(AgentRegistry::new());
let monitor = Arc::new(super::super::monitoring::DefaultWorkflowMonitor::new());
let executor = DefaultWorkflowExecutor::new(registry.clone()).with_monitor(monitor.clone());
let workflow = WorkflowBuilder::new("monitor_test")
.add_step(StepBuilder::action("step1", "Step 1", "agent1", "op1").build())
.build();
let context = WorkflowContext::new(workflow.id, registry);
let execution_id = context.execution_id;
let _result = executor.execute(&workflow, &context).await;
let metrics = monitor.get_metrics(execution_id).await;
assert_eq!(metrics.execution_id, execution_id);
assert!(metrics.total_steps > 0); }
#[actix_rt::test]
async fn test_checkpoint_creation() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry.clone());
let workflow = WorkflowBuilder::new("checkpoint_test")
.add_step(StepBuilder::action("step1", "Step 1", "agent1", "op1").build())
.add_step(StepBuilder::action("step2", "Step 2", "agent2", "op2").build())
.build();
let context = WorkflowContext::new(workflow.id, registry);
let execution_id = context.execution_id;
let execution_order = vec![vec!["step1".to_string()], vec!["step2".to_string()]];
let checkpoint_result = executor.save_checkpoint(execution_id, 1, &execution_order);
assert!(checkpoint_result.is_ok());
}
#[actix_rt::test]
async fn test_parallel_step_execution() {
let registry = Arc::new(AgentRegistry::new());
let executor = DefaultWorkflowExecutor::new(registry.clone());
let parallel_steps = vec![
StepBuilder::action("parallel1", "Parallel 1", "agent1", "op1").build(),
StepBuilder::action("parallel2", "Parallel 2", "agent2", "op2").build(),
];
let context = WorkflowContext::new(Uuid::new_v4(), registry);
context.set_state(WorkflowState::Running);
let result = executor.execute_parallel(¶llel_steps, &context).await;
assert!(result.is_err());
if let Err(e) = result {
match e {
WorkflowError::AgentError(_) => {
}
other => panic!("Expected AgentError, got: {:?}", other),
}
}
}
async fn setup_executor_with_agent() -> (DefaultWorkflowExecutor, Arc<AgentRegistry>) {
let registry = Arc::new(AgentRegistry::new());
let agent_id = Uuid::new_v4();
let spec = AgentSpec {
id: agent_id,
class: AgentClass::Analyzer,
config: serde_json::json!({}),
};
registry.spawn_agent(spec).await.unwrap();
registry
.register_agent_with_name("test_agent", agent_id)
.await;
let executor = DefaultWorkflowExecutor::new(registry.clone());
(executor, registry)
}