use std::collections::{HashMap, HashSet};
use aws_sdk_lambda::types::{Operation, OperationStatus};
use crate::operation_id::OperationIdGenerator;
use crate::types::ExecutionMode;
pub struct ReplayEngine {
operations: HashMap<String, Operation>,
visited: HashSet<String>,
completed_ids: HashSet<String>,
mode: ExecutionMode,
id_generator: OperationIdGenerator,
}
fn is_completed_status(status: &OperationStatus) -> bool {
matches!(
status,
OperationStatus::Succeeded
| OperationStatus::Failed
| OperationStatus::Cancelled
| OperationStatus::TimedOut
| OperationStatus::Stopped
)
}
impl ReplayEngine {
pub fn new(operations: HashMap<String, Operation>, parent_id: Option<String>) -> Self {
let completed_ids: HashSet<String> = operations
.iter()
.filter(|(_, op)| {
is_completed_status(&op.status)
&& op.r#type != aws_sdk_lambda::types::OperationType::Execution
})
.map(|(id, _)| id.clone())
.collect();
let mode = if completed_ids.is_empty() {
ExecutionMode::Executing
} else {
ExecutionMode::Replaying
};
Self {
operations,
visited: HashSet::new(),
completed_ids,
mode,
id_generator: OperationIdGenerator::new(parent_id),
}
}
pub fn check_result(&self, operation_id: &str) -> Option<&Operation> {
self.operations
.get(operation_id)
.filter(|op| is_completed_status(&op.status))
}
pub fn track_replay(&mut self, operation_id: &str) {
self.visited.insert(operation_id.to_string());
if self.mode == ExecutionMode::Replaying && self.completed_ids.is_subset(&self.visited) {
self.mode = ExecutionMode::Executing;
}
}
pub fn is_replaying(&self) -> bool {
self.mode == ExecutionMode::Replaying
}
pub fn execution_mode(&self) -> ExecutionMode {
self.mode.clone()
}
pub fn generate_operation_id(&mut self) -> String {
self.id_generator.next_id()
}
pub fn get_operation(&self, operation_id: &str) -> Option<&Operation> {
self.operations.get(operation_id)
}
pub fn operations(&self) -> &HashMap<String, Operation> {
&self.operations
}
pub fn insert_operation(&mut self, id: String, operation: Operation) {
if is_completed_status(&operation.status)
&& operation.r#type != aws_sdk_lambda::types::OperationType::Execution
{
self.completed_ids.insert(id.clone());
}
self.operations.insert(id, operation);
}
}
#[cfg(test)]
mod tests {
use super::*;
use aws_sdk_lambda::types::{Operation, OperationStatus, OperationType};
fn make_operation(id: &str, status: OperationStatus, op_type: OperationType) -> Operation {
Operation::builder()
.id(id)
.r#type(op_type)
.status(status)
.start_timestamp(aws_smithy_types::DateTime::from_secs(0))
.build()
.unwrap()
}
#[test]
fn empty_history_starts_executing() {
let engine = ReplayEngine::new(HashMap::new(), None);
assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
assert!(!engine.is_replaying());
}
#[test]
fn completed_operations_start_replaying() {
let mut ops = HashMap::new();
ops.insert(
"op1".to_string(),
make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
);
let engine = ReplayEngine::new(ops, None);
assert_eq!(engine.execution_mode(), ExecutionMode::Replaying);
assert!(engine.is_replaying());
}
#[test]
fn only_pending_operations_start_executing() {
let mut ops = HashMap::new();
ops.insert(
"op1".to_string(),
make_operation("op1", OperationStatus::Pending, OperationType::Step),
);
let engine = ReplayEngine::new(ops, None);
assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
}
#[test]
fn execution_type_excluded_from_replay_tracking() {
let mut ops = HashMap::new();
ops.insert(
"exec".to_string(),
make_operation("exec", OperationStatus::Succeeded, OperationType::Execution),
);
let engine = ReplayEngine::new(ops, None);
assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
}
#[test]
fn transitions_to_executing_after_all_visited() {
let mut ops = HashMap::new();
ops.insert(
"op1".to_string(),
make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
);
ops.insert(
"op2".to_string(),
make_operation("op2", OperationStatus::Failed, OperationType::Step),
);
let mut engine = ReplayEngine::new(ops, None);
assert!(engine.is_replaying());
engine.track_replay("op1");
assert!(engine.is_replaying());
engine.track_replay("op2");
assert!(!engine.is_replaying()); assert_eq!(engine.execution_mode(), ExecutionMode::Executing);
}
#[test]
fn check_result_returns_completed_operations() {
let mut ops = HashMap::new();
ops.insert(
"op1".to_string(),
make_operation("op1", OperationStatus::Succeeded, OperationType::Step),
);
ops.insert(
"op2".to_string(),
make_operation("op2", OperationStatus::Pending, OperationType::Step),
);
let engine = ReplayEngine::new(ops, None);
assert!(engine.check_result("op1").is_some());
assert!(engine.check_result("op2").is_none()); assert!(engine.check_result("op3").is_none()); }
#[test]
fn generate_operation_id_is_deterministic() {
let mut engine1 = ReplayEngine::new(HashMap::new(), None);
let mut engine2 = ReplayEngine::new(HashMap::new(), None);
let id1a = engine1.generate_operation_id();
let id1b = engine2.generate_operation_id();
assert_eq!(id1a, id1b);
let id2a = engine1.generate_operation_id();
let id2b = engine2.generate_operation_id();
assert_eq!(id2a, id2b);
assert_ne!(id1a, id2a);
}
#[test]
fn mixed_statuses_only_track_completed() {
let mut ops = HashMap::new();
ops.insert(
"done".to_string(),
make_operation("done", OperationStatus::Succeeded, OperationType::Step),
);
ops.insert(
"pending".to_string(),
make_operation("pending", OperationStatus::Pending, OperationType::Wait),
);
ops.insert(
"started".to_string(),
make_operation("started", OperationStatus::Started, OperationType::Step),
);
let mut engine = ReplayEngine::new(ops, None);
assert!(engine.is_replaying());
engine.track_replay("done");
assert!(!engine.is_replaying());
}
#[test]
fn all_completed_statuses_are_tracked() {
for status in [
OperationStatus::Succeeded,
OperationStatus::Failed,
OperationStatus::Cancelled,
OperationStatus::TimedOut,
OperationStatus::Stopped,
] {
let mut ops = HashMap::new();
ops.insert(
"op".to_string(),
make_operation("op", status, OperationType::Step),
);
let engine = ReplayEngine::new(ops, None);
assert!(engine.is_replaying(), "Should replay for completed status");
}
}
#[test]
fn insert_operation_updates_state() {
let mut engine = ReplayEngine::new(HashMap::new(), None);
assert!(!engine.is_replaying());
let op = make_operation("new_op", OperationStatus::Succeeded, OperationType::Step);
engine.insert_operation("new_op".to_string(), op);
assert!(engine.check_result("new_op").is_some());
}
}