#![cfg(test)]
#![allow(dead_code)]
#![allow(clippy::unwrap_used, clippy::panic)] #![allow(clippy::cast_precision_loss)] #![allow(clippy::cast_sign_loss)] #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_possible_wrap)] #![allow(clippy::cast_lossless)] #![allow(clippy::missing_panics_doc)] #![allow(clippy::missing_errors_doc)] #![allow(missing_docs)] #![allow(clippy::items_after_statements)] #![allow(clippy::used_underscore_binding)] #![allow(clippy::needless_pass_by_value)]
use std::collections::HashMap;
mod common;
#[allow(unused_imports)]
use common::{SagaStepDef, SagaStepResult, TestSagaExecutor};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
struct SagaId(u64);
impl SagaId {
fn new(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone)]
struct SagaStep {
name: String,
service: String,
database: String,
input: HashMap<String, String>,
compensation_name: Option<String>,
}
impl SagaStep {
fn new(name: &str, service: &str, database: &str) -> Self {
Self {
name: name.to_string(),
service: service.to_string(),
database: database.to_string(),
input: HashMap::new(),
compensation_name: None,
}
}
fn with_input(mut self, key: &str, value: &str) -> Self {
self.input.insert(key.to_string(), value.to_string());
self
}
fn with_compensation(mut self, comp_name: &str) -> Self {
self.compensation_name = Some(comp_name.to_string());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum StepStatus {
Pending,
Completed,
Failed,
Compensating,
Compensated,
}
#[derive(Debug, Clone)]
struct StepResult {
step_name: String,
status: StepStatus,
result: Option<HashMap<String, String>>,
error: Option<String>,
}
impl StepResult {
fn success(step_name: &str, result: HashMap<String, String>) -> Self {
Self {
step_name: step_name.to_string(),
status: StepStatus::Completed,
result: Some(result),
error: None,
}
}
fn failure(step_name: &str, error: &str) -> Self {
Self {
step_name: step_name.to_string(),
status: StepStatus::Failed,
result: None,
error: Some(error.to_string()),
}
}
fn compensated(step_name: &str) -> Self {
Self {
step_name: step_name.to_string(),
status: StepStatus::Compensated,
result: None,
error: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SagaStatus {
Pending,
Running,
Completed,
Compensating,
Compensated,
DeadLettered,
}
#[derive(Debug, Clone)]
struct SagaExecution {
id: SagaId,
name: String,
steps: Vec<SagaStep>,
results: Vec<StepResult>,
status: SagaStatus,
trace_id: String,
created_at: u64,
completed_at: Option<u64>,
}
impl SagaExecution {
fn new(id: SagaId, name: &str) -> Self {
Self {
id,
name: name.to_string(),
steps: vec![],
results: vec![],
status: SagaStatus::Pending,
trace_id: format!("trace_{}", id.0),
created_at: 0,
completed_at: None,
}
}
fn add_step(mut self, step: SagaStep) -> Self {
self.steps.push(step);
self
}
fn mark_running(mut self) -> Self {
self.status = SagaStatus::Running;
self
}
fn mark_completed(mut self) -> Self {
self.status = SagaStatus::Completed;
self.completed_at = Some(1);
self
}
fn mark_failed(mut self) -> Self {
self.status = SagaStatus::Compensating;
self
}
fn mark_compensated(mut self) -> Self {
self.status = SagaStatus::Compensated;
self.completed_at = Some(1);
self
}
fn mark_deadlettered(mut self) -> Self {
self.status = SagaStatus::DeadLettered;
self.completed_at = Some(1);
self
}
fn add_result(mut self, result: StepResult) -> Self {
self.results.push(result);
self
}
fn is_successful(&self) -> bool {
self.status == SagaStatus::Completed
&& self.results.iter().all(|r| r.status == StepStatus::Completed)
}
fn is_compensated(&self) -> bool {
self.status == SagaStatus::Compensated
}
fn executed_steps(&self) -> Vec<&str> {
self.results.iter().map(|r| r.step_name.as_str()).collect()
}
fn compensation_order(&self) -> Vec<&str> {
self.executed_steps().into_iter().rev().collect()
}
}
#[test]
fn test_saga_two_step_success() {
let saga = SagaExecution::new(SagaId::new(1), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_input("orderId", "order_123")
.with_input("customerId", "customer_456")
.with_compensation("CancelOrder"),
)
.add_step(
SagaStep::new("ReserveInventory", "InventoryService", "inventory_db")
.with_input("productId", "product_789")
.with_input("quantity", "5")
.with_compensation("ReleaseInventory"),
);
assert_eq!(saga.steps.len(), 2);
assert_eq!(saga.steps[0].name, "CreateOrder");
assert_eq!(saga.steps[0].service, "OrderService");
assert_eq!(saga.steps[0].database, "orders_db");
assert_eq!(saga.steps[1].name, "ReserveInventory");
assert_eq!(saga.steps[1].service, "InventoryService");
assert_eq!(saga.steps[1].database, "inventory_db");
assert!(saga.steps[0].compensation_name.is_some());
assert!(saga.steps[1].compensation_name.is_some());
}
#[test]
fn test_saga_partial_success() {
let saga = SagaExecution::new(SagaId::new(2), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_compensation("CancelOrder"),
)
.add_step(
SagaStep::new("ReserveInventory", "InventoryService", "inventory_db")
.with_compensation("ReleaseInventory"),
)
.mark_running()
.add_result(StepResult::success("CreateOrder", {
let mut r = HashMap::new();
r.insert("orderId".to_string(), "order_123".to_string());
r
}))
.add_result(StepResult::failure("ReserveInventory", "Insufficient inventory"));
assert!(saga.status == SagaStatus::Running || saga.status == SagaStatus::Compensating);
assert_eq!(saga.results[0].status, StepStatus::Completed);
assert_eq!(saga.results[1].status, StepStatus::Failed);
assert_eq!(saga.results[1].error, Some("Insufficient inventory".to_string()));
assert!(!saga.is_successful());
}
#[test]
fn test_saga_compensation_rollback() {
let saga = SagaExecution::new(SagaId::new(3), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_compensation("CancelOrder"),
)
.add_step(
SagaStep::new("ReserveInventory", "InventoryService", "inventory_db")
.with_compensation("ReleaseInventory"),
)
.mark_running()
.add_result(StepResult::success("CreateOrder", HashMap::new()))
.add_result(StepResult::success("ReserveInventory", HashMap::new()))
.mark_failed();
let comp_order = saga.compensation_order();
assert_eq!(comp_order.len(), 2);
assert_eq!(comp_order[0], "ReserveInventory"); assert_eq!(comp_order[1], "CreateOrder"); }
#[test]
fn test_saga_deadletter_queue() {
let saga = SagaExecution::new(SagaId::new(4), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_compensation("CancelOrder"),
)
.add_step(
SagaStep::new("ReserveInventory", "InventoryService", "inventory_db")
.with_compensation("ReleaseInventory"),
)
.mark_running()
.add_result(StepResult::success("CreateOrder", HashMap::new()))
.add_result(StepResult::failure("ReserveInventory", "Network timeout"))
.add_result(StepResult::failure("ReleaseInventory", "Service unreachable"))
.mark_deadlettered();
assert_eq!(saga.status, SagaStatus::DeadLettered);
assert!(saga.completed_at.is_some());
assert!(saga.results.iter().any(|r| r.error.is_some()));
}
#[test]
fn test_saga_observer_notification() {
let saga = SagaExecution::new(SagaId::new(5), "CreateOrderWithInventoryReservation")
.add_step(SagaStep::new("CreateOrder", "OrderService", "orders_db"))
.add_step(SagaStep::new("ReserveInventory", "InventoryService", "inventory_db"))
.mark_running()
.add_result(StepResult::success("CreateOrder", HashMap::new()))
.add_result(StepResult::success("ReserveInventory", HashMap::new()))
.mark_completed();
assert_eq!(saga.status, SagaStatus::Completed);
assert!(saga.completed_at.is_some());
assert!(saga.is_successful());
}
#[test]
fn test_saga_tracing_context() {
let saga = SagaExecution::new(SagaId::new(6), "CreateOrderWithInventoryReservation")
.add_step(SagaStep::new("CreateOrder", "OrderService", "orders_db"))
.add_step(SagaStep::new("ReserveInventory", "InventoryService", "inventory_db"));
assert!(!saga.trace_id.is_empty());
assert!(saga.trace_id.contains("trace_"));
for (i, step) in saga.steps.iter().enumerate() {
assert_eq!(step.service, ["OrderService", "InventoryService"][i]);
}
}
#[test]
fn test_saga_concurrent_execution() {
let saga1 = SagaExecution::new(SagaId::new(7), "CreateOrderWithInventoryReservation")
.add_step(SagaStep::new("CreateOrder", "OrderService", "orders_db"))
.mark_running();
let saga2 = SagaExecution::new(SagaId::new(8), "CreateOrderWithInventoryReservation")
.add_step(SagaStep::new("CreateOrder", "OrderService", "orders_db"))
.mark_running();
assert_ne!(saga1.id, saga2.id);
assert_ne!(saga1.trace_id, saga2.trace_id);
assert_eq!(saga1.status, SagaStatus::Running);
assert_eq!(saga2.status, SagaStatus::Running);
}
#[test]
fn test_saga_idempotency() {
let saga1 = SagaExecution::new(SagaId::new(9), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_input("orderId", "order_same_123")
.with_input("customerId", "customer_456"),
)
.mark_running()
.add_result(StepResult::success("CreateOrder", {
let mut r = HashMap::new();
r.insert("orderId".to_string(), "order_same_123".to_string());
r
}))
.mark_completed();
let saga2 = SagaExecution::new(SagaId::new(10), "CreateOrderWithInventoryReservation")
.add_step(
SagaStep::new("CreateOrder", "OrderService", "orders_db")
.with_input("orderId", "order_same_123") .with_input("customerId", "customer_456"),
)
.mark_running()
.add_result(StepResult::success("CreateOrder", {
let mut r = HashMap::new();
r.insert("orderId".to_string(), "order_same_123".to_string());
r
}))
.mark_completed();
assert_eq!(saga1.steps[0].input.get("orderId"), saga2.steps[0].input.get("orderId"));
assert!(saga1.is_successful());
assert!(saga2.is_successful());
assert_eq!(saga1.results[0].status, saga2.results[0].status);
}
#[tokio::test]
async fn test_saga_forward_phase_execution() {
let mut executor = TestSagaExecutor::new();
let steps = vec![
SagaStepDef::new(1, "order-service", "orders", serde_json::json!({"orderId": "order_123"}))
.with_compensation("cancel_order"),
SagaStepDef::new(
2,
"inventory-service",
"inventory",
serde_json::json!({"productId": "prod_456"}),
)
.with_compensation("release_inventory"),
];
let results = executor
.execute_saga("saga-123", steps.clone())
.await
.unwrap_or_else(|e| panic!("Saga execution should succeed: {e}"));
assert_eq!(results.len(), 2, "Should have executed 2 steps");
assert!(results[0].success, "First step should succeed");
assert!(results[1].success, "Second step should succeed");
assert_eq!(results[0].step_number, 1);
assert_eq!(results[1].step_number, 2);
}
#[tokio::test]
async fn test_saga_lifo_compensation_order() {
let executor = TestSagaExecutor::new();
let forward_steps = vec![
SagaStepDef::new(1, "order-service", "orders", serde_json::json!({}))
.with_compensation("cancel_order"),
SagaStepDef::new(2, "inventory-service", "inventory", serde_json::json!({}))
.with_compensation("release_inventory"),
SagaStepDef::new(3, "payment-service", "payments", serde_json::json!({}))
.with_compensation("refund_payment"),
];
let compensation_results = vec![
SagaStepResult::success(3, serde_json::json!({"refunded": true})),
SagaStepResult::success(2, serde_json::json!({"released": true})),
SagaStepResult::success(1, serde_json::json!({"cancelled": true})),
];
let order_check = executor.verify_lifo_order(&forward_steps, &compensation_results);
if let Err(e) = order_check {
panic!("Compensation order verification failed: {}", e);
}
}
#[tokio::test]
async fn test_multi_step_saga_execution() {
let mut executor = TestSagaExecutor::new();
let steps = vec![
SagaStepDef::new(
1,
"order-service",
"orders",
serde_json::json!({"customerId": "cust_123"}),
),
SagaStepDef::new(
2,
"inventory-service",
"inventory",
serde_json::json!({"productId": "prod_456", "quantity": 5}),
),
SagaStepDef::new(3, "payment-service", "payments", serde_json::json!({"amount": 100.00})),
];
let results = executor
.execute_saga("multi-saga", steps)
.await
.unwrap_or_else(|e| panic!("expected Ok executing multi-step saga: {e}"));
assert_eq!(results.len(), 3);
for (idx, result) in results.iter().enumerate() {
assert!(result.success);
assert_eq!(result.step_number, idx + 1);
assert!(result.data.is_some());
}
}