#![allow(clippy::cast_possible_truncation)] #![allow(clippy::unwrap_used)] use fraiseql_core::federation::{
saga_compensator::SagaCompensator,
saga_coordinator::{CompensationStrategy, SagaCoordinator},
saga_executor::SagaExecutor,
};
use super::common;
#[tokio::test]
async fn test_saga_with_5_steps_all_succeed() {
let scenario = common::TestSagaScenario::new(5);
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
common::execute_all_steps(saga_id, 5).await;
assert_eq!(saga_id.get_version_num(), 4); }
#[tokio::test]
async fn test_saga_with_7_steps_all_succeed() {
let scenario = common::TestSagaScenario::new(7);
let (steps, saga_id) = common::execute_saga_scenario(scenario).await;
common::execute_all_steps(saga_id, steps.len()).await;
assert_eq!(steps.len(), 7);
}
#[tokio::test]
async fn test_saga_with_10_steps_all_succeed() {
let scenario = common::TestSagaScenario::new(10);
let (steps, saga_id) = common::execute_saga_scenario(scenario).await;
common::execute_all_steps(saga_id, steps.len()).await;
assert_eq!(steps.len(), 10);
}
#[tokio::test]
async fn test_saga_execution_preserves_step_order() {
let scenario = common::TestSagaScenario::new(5);
let (steps, saga_id) = common::execute_saga_scenario(scenario).await;
common::execute_all_steps(saga_id, steps.len()).await;
for (i, step) in steps.iter().enumerate() {
assert_eq!(step.number, (i + 1) as u32);
}
}
#[tokio::test]
async fn test_each_step_receives_previous_step_output() {
let scenario = common::TestSagaScenario::new(3);
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
let executor = SagaExecutor::new();
let result1 = executor
.execute_step(saga_id, 1, "mutation1", &serde_json::json!({"step": 1}), "service-1")
.await;
let step1_data = result1.unwrap_or_else(|e| panic!("execute_step(1) failed: {e}")).data;
assert!(step1_data.is_some());
let result2 = executor
.execute_step(
saga_id,
2,
"mutation2",
&serde_json::json!({"step": 2, "prev_step_data": step1_data}),
"service-2",
)
.await;
assert!(result2.unwrap_or_else(|e| panic!("execute_step(2) failed: {e}")).data.is_some());
}
#[tokio::test]
async fn test_saga_result_contains_all_step_data() {
let scenario = common::TestSagaScenario::new(4);
let (steps, saga_id) = common::execute_saga_scenario(scenario).await;
let executor = SagaExecutor::new();
let mut step_results = vec![];
for step_number in 1..=steps.len() as u32 {
let result = executor
.execute_step(
saga_id,
step_number,
&format!("mutation{}", step_number),
&serde_json::json!({"step": step_number}),
&format!("service-{}", step_number % 3 + 1),
)
.await;
step_results
.push(result.unwrap_or_else(|e| panic!("execute_step({step_number}) failed: {e}")));
}
assert_eq!(step_results.len(), 4);
for (i, result) in step_results.iter().enumerate() {
assert_eq!(result.step_number, (i + 1) as u32);
assert!(result.success);
assert!(result.data.is_some());
}
}
#[tokio::test]
async fn test_concurrent_5_sagas_execute_independently() {
let saga_count = 5;
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
let mut saga_ids = vec![];
for _ in 0..saga_count {
let steps = common::TestSagaScenario::new(3).build_steps();
let saga_id = coordinator.create_saga(steps).await.expect("Failed to create saga");
saga_ids.push(saga_id);
}
for saga_id in &saga_ids {
common::execute_all_steps(*saga_id, 3).await;
}
assert_eq!(saga_ids.len(), saga_count);
for i in 0..saga_ids.len() {
for j in (i + 1)..saga_ids.len() {
assert_ne!(saga_ids[i], saga_ids[j], "Saga IDs should be unique");
}
}
}
#[tokio::test]
async fn test_concurrent_10_sagas_execute_independently() {
let saga_count = 10;
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
let mut saga_ids = vec![];
for _ in 0..saga_count {
let steps = common::TestSagaScenario::new(2).build_steps();
let saga_id = coordinator.create_saga(steps).await.expect("Failed to create saga");
saga_ids.push(saga_id);
}
for saga_id in &saga_ids {
common::execute_all_steps(*saga_id, 2).await;
}
assert_eq!(saga_ids.len(), saga_count);
for i in 0..saga_ids.len() {
for j in (i + 1)..saga_ids.len() {
assert_ne!(saga_ids[i], saga_ids[j]);
}
}
}
#[tokio::test]
async fn test_10_concurrent_sagas_execute_independently() {
let mut saga_ids = Vec::new();
for i in 0..10 {
let scenario = common::TestSagaScenario::new(3 + (i % 3));
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
saga_ids.push(saga_id);
}
for (i, saga_id) in saga_ids.iter().enumerate() {
common::execute_all_steps(*saga_id, 3 + (i % 3)).await;
}
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
for saga_id in saga_ids {
let status = coordinator.get_saga_status(saga_id).await.expect("Failed to get saga status");
assert_eq!(status.saga_id, saga_id);
}
}
#[tokio::test]
async fn test_50_concurrent_sagas_execute_independently() {
let mut saga_ids = Vec::new();
for i in 0..50 {
let step_count = 2 + (i % 4);
let scenario = common::TestSagaScenario::new(step_count);
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
saga_ids.push((saga_id, step_count));
}
for (saga_id, step_count) in &saga_ids {
common::execute_all_steps(*saga_id, *step_count).await;
}
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
for (saga_id, _) in saga_ids {
let status = coordinator.get_saga_status(saga_id).await.expect("Failed to get saga status");
assert_eq!(status.saga_id, saga_id);
}
}
#[tokio::test]
async fn test_concurrent_sagas_with_different_strategies() {
let mut auto_sagas = Vec::new();
let mut manual_sagas = Vec::new();
for _ in 0..5 {
let auto_scenario =
common::TestSagaScenario::new(3).with_strategy(CompensationStrategy::Automatic);
let (_, auto_id) = common::execute_saga_scenario(auto_scenario).await;
auto_sagas.push(auto_id);
let manual_scenario =
common::TestSagaScenario::new(3).with_strategy(CompensationStrategy::Manual);
let (_, manual_id) = common::execute_saga_scenario(manual_scenario).await;
manual_sagas.push(manual_id);
}
for saga_id in &auto_sagas {
common::execute_all_steps(*saga_id, 3).await;
}
for saga_id in &manual_sagas {
common::execute_all_steps(*saga_id, 3).await;
}
let coordinator_auto = SagaCoordinator::new(CompensationStrategy::Automatic);
let coordinator_manual = SagaCoordinator::new(CompensationStrategy::Manual);
for saga_id in auto_sagas {
let status = coordinator_auto
.get_saga_status(saga_id)
.await
.expect("Failed to get auto saga status");
assert_eq!(status.saga_id, saga_id);
}
for saga_id in manual_sagas {
let status = coordinator_manual
.get_saga_status(saga_id)
.await
.expect("Failed to get manual saga status");
assert_eq!(status.saga_id, saga_id);
}
}
#[tokio::test]
async fn test_concurrent_sagas_some_fail_some_succeed() {
let mut sagas = Vec::new();
for i in 0..10 {
let scenario = common::TestSagaScenario::new(5);
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
sagas.push((saga_id, i % 2 == 0)); }
for (saga_id, should_fail) in &sagas {
if *should_fail {
common::execute_all_steps_with_failure(*saga_id, 5, Some(3)).await;
} else {
common::execute_all_steps(*saga_id, 5).await;
}
}
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
for (saga_id, should_fail) in sagas {
let status = coordinator.get_saga_status(saga_id).await.expect("Failed to get saga status");
assert_eq!(status.saga_id, saga_id);
let _ = should_fail;
}
}
#[tokio::test]
async fn test_in_flight_saga_list_accurate_during_concurrent_execution() {
let coordinator = SagaCoordinator::new(CompensationStrategy::Automatic);
let mut saga_ids = Vec::new();
for i in 0..8 {
let scenario = common::TestSagaScenario::new(2 + (i % 2));
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
saga_ids.push(saga_id);
}
let in_flight = coordinator
.list_in_flight_sagas()
.await
.expect("Failed to list in-flight sagas");
let _ = in_flight;
assert!(!saga_ids.is_empty(), "Successfully created 8 sagas");
}
#[tokio::test]
async fn test_concurrent_compensation_does_not_interfere() {
let mut saga_ids = Vec::new();
for _ in 0..5 {
let scenario =
common::TestSagaScenario::new(4).with_strategy(CompensationStrategy::Automatic);
let (_, saga_id) = common::execute_saga_scenario(scenario).await;
saga_ids.push(saga_id);
}
for saga_id in &saga_ids {
common::execute_all_steps_with_failure(*saga_id, 4, Some(2)).await;
common::execute_compensation(*saga_id, 1).await;
}
let compensator = SagaCompensator::new();
for saga_id in saga_ids {
let comp_status = compensator
.get_compensation_status(saga_id)
.await
.expect("Failed to get compensation status");
let _ = comp_status;
}
}