use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq)]
pub struct StepFailure {
pub step_id: String,
pub step_index: usize,
pub attempts: u32,
pub reason: String,
}
#[derive(Debug, Clone, PartialEq)]
pub enum WorkflowResult {
Success {
steps_executed: usize,
total_retries: u32,
},
Failed {
failure: StepFailure,
last_checkpoint: Option<usize>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StepAction {
Click(String),
Type(String, String),
Wait(String),
Assert(String),
Checkpoint,
}
#[derive(Debug, Clone)]
pub struct DurableStep {
pub id: String,
pub action: StepAction,
pub max_retries: u32,
pub timeout_ms: u64,
}
impl DurableStep {
#[must_use]
pub fn new(id: impl Into<String>, action: StepAction) -> Self {
Self {
id: id.into(),
action,
max_retries: 2,
timeout_ms: 5_000,
}
}
#[must_use]
pub fn checkpoint(id: impl Into<String>) -> Self {
Self {
id: id.into(),
action: StepAction::Checkpoint,
max_retries: 1,
timeout_ms: 1_000,
}
}
#[must_use]
pub fn with_retries(id: impl Into<String>, action: StepAction, max_retries: u32) -> Self {
Self {
id: id.into(),
action,
max_retries,
timeout_ms: 5_000,
}
}
#[must_use]
pub fn with_config(
id: impl Into<String>,
action: StepAction,
max_retries: u32,
timeout_ms: u64,
) -> Self {
Self {
id: id.into(),
action,
max_retries,
timeout_ms,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Checkpoint {
pub step_index: usize,
pub timestamp_ms: u64,
pub state_hash: u64,
}
impl Checkpoint {
#[must_use]
pub fn new(step_index: usize, state_hash: u64) -> Self {
Self {
step_index,
timestamp_ms: current_timestamp_ms(),
state_hash,
}
}
}
pub trait StepExecutor {
fn execute(&mut self, action: &StepAction) -> Result<(), String>;
}
pub struct DurableRunner {
current_step: usize,
checkpoints: Vec<Checkpoint>,
total_retries: u32,
}
impl DurableRunner {
#[must_use]
pub fn new() -> Self {
Self {
current_step: 0,
checkpoints: Vec::new(),
total_retries: 0,
}
}
pub fn run(
&mut self,
steps: &[DurableStep],
executor: &mut dyn StepExecutor,
) -> WorkflowResult {
while self.current_step < steps.len() {
let step = &steps[self.current_step];
match self.execute_with_retry(step, executor) {
Ok(retries_used) => {
self.total_retries += retries_used;
if step.action == StepAction::Checkpoint {
self.save_checkpoint(self.current_step, steps);
}
self.current_step += 1;
}
Err(reason) => {
let failure = StepFailure {
step_id: step.id.clone(),
step_index: self.current_step,
attempts: step.max_retries + 1,
reason,
};
let last_checkpoint = self.checkpoints.last().map(|c| c.step_index);
return WorkflowResult::Failed {
failure,
last_checkpoint,
};
}
}
}
WorkflowResult::Success {
steps_executed: self.current_step,
total_retries: self.total_retries,
}
}
pub fn resume_from_checkpoint(&mut self, checkpoint: &Checkpoint) {
self.current_step = checkpoint.step_index + 1;
}
#[must_use]
pub fn current_step(&self) -> usize {
self.current_step
}
#[must_use]
pub fn checkpoints(&self) -> &[Checkpoint] {
&self.checkpoints
}
#[must_use]
pub fn last_checkpoint(&self) -> Option<&Checkpoint> {
self.checkpoints.last()
}
pub fn reset(&mut self) {
self.current_step = 0;
self.checkpoints.clear();
self.total_retries = 0;
}
fn execute_with_retry(
&self,
step: &DurableStep,
executor: &mut dyn StepExecutor,
) -> Result<u32, String> {
let mut last_err = String::new();
for attempt in 0..=step.max_retries {
match executor.execute(&step.action) {
Ok(()) => return Ok(attempt),
Err(e) => last_err = e,
}
}
Err(last_err)
}
fn save_checkpoint(&mut self, step_index: usize, steps: &[DurableStep]) {
let hash = hash_step_ids(steps);
self.checkpoints.push(Checkpoint::new(step_index, hash));
}
}
impl Default for DurableRunner {
fn default() -> Self {
Self::new()
}
}
fn hash_step_ids(steps: &[DurableStep]) -> u64 {
let mut hasher = DefaultHasher::new();
for step in steps {
step.id.hash(&mut hasher);
}
hasher.finish()
}
fn current_timestamp_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}
pub struct MockExecutor {
results: Vec<Result<(), String>>,
received: Vec<StepAction>,
}
impl MockExecutor {
#[must_use]
pub fn always_ok() -> Self {
Self {
results: Vec::new(),
received: Vec::new(),
}
}
#[must_use]
pub fn from_results(results: Vec<Result<(), String>>) -> Self {
Self {
results,
received: Vec::new(),
}
}
#[must_use]
pub fn received(&self) -> &[StepAction] {
&self.received
}
}
impl StepExecutor for MockExecutor {
fn execute(&mut self, action: &StepAction) -> Result<(), String> {
self.received.push(action.clone());
if self.results.is_empty() {
Ok(())
} else {
self.results.remove(0)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn click_step(id: &str, target: &str) -> DurableStep {
DurableStep::new(id, StepAction::Click(target.into()))
}
fn three_step_workflow() -> Vec<DurableStep> {
vec![
click_step("step-a", "File"),
DurableStep::checkpoint("cp-1"),
click_step("step-b", "Save"),
]
}
#[test]
fn durable_step_new_sets_default_retries_and_timeout() {
let step = DurableStep::new("s", StepAction::Click("btn".into()));
assert_eq!(step.max_retries, 2);
assert_eq!(step.timeout_ms, 5_000);
}
#[test]
fn durable_step_checkpoint_factory_produces_checkpoint_action() {
let step = DurableStep::checkpoint("cp");
assert_eq!(step.action, StepAction::Checkpoint);
assert_eq!(step.id, "cp");
}
#[test]
fn durable_step_with_retries_overrides_retry_count() {
let step = DurableStep::with_retries("s", StepAction::Click("x".into()), 5);
assert_eq!(step.max_retries, 5);
}
#[test]
fn durable_step_with_config_stores_all_fields() {
let step = DurableStep::with_config(
"s",
StepAction::Type("field".into(), "hello".into()),
3,
2_000,
);
assert_eq!(step.max_retries, 3);
assert_eq!(step.timeout_ms, 2_000);
}
#[test]
fn successful_run_returns_correct_step_count() {
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::always_ok();
let steps = three_step_workflow();
let result = runner.run(&steps, &mut exec);
assert_eq!(
result,
WorkflowResult::Success {
steps_executed: 3,
total_retries: 0
}
);
}
#[test]
fn failed_step_returns_failure_with_correct_id() {
let steps = three_step_workflow();
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::from_results(vec![
Ok(()), Ok(()), Err("not found".into()), Err("not found".into()), Err("not found".into()), ]);
let result = runner.run(&steps, &mut exec);
match result {
WorkflowResult::Failed { failure, .. } => {
assert_eq!(failure.step_id, "step-b");
assert_eq!(failure.step_index, 2);
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn failure_result_includes_last_checkpoint_index() {
let steps = three_step_workflow();
let mut runner = DurableRunner::new();
let fail_all: Vec<Result<(), String>> = (0..10)
.map(|i| if i < 2 { Ok(()) } else { Err("err".into()) })
.collect();
let mut exec = MockExecutor::from_results(fail_all);
let result = runner.run(&steps, &mut exec);
match result {
WorkflowResult::Failed {
last_checkpoint, ..
} => {
assert_eq!(last_checkpoint, Some(1));
}
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn step_succeeds_on_second_attempt_after_one_failure() {
let steps = vec![DurableStep::with_retries(
"flaky",
StepAction::Click("btn".into()),
1,
)];
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::from_results(vec![Err("transient".into()), Ok(())]);
let result = runner.run(&steps, &mut exec);
assert_eq!(
result,
WorkflowResult::Success {
steps_executed: 1,
total_retries: 1
}
);
}
#[test]
fn zero_retries_fails_immediately_on_first_error() {
let steps = vec![DurableStep::with_retries(
"strict",
StepAction::Click("x".into()),
0,
)];
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::from_results(vec![Err("boom".into())]);
let result = runner.run(&steps, &mut exec);
match result {
WorkflowResult::Failed { failure, .. } => assert_eq!(failure.attempts, 1),
other => panic!("expected Failed, got {other:?}"),
}
}
#[test]
fn checkpoint_step_saves_checkpoint() {
let steps = three_step_workflow();
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::always_ok();
runner.run(&steps, &mut exec);
assert_eq!(runner.checkpoints().len(), 1);
assert_eq!(runner.last_checkpoint().unwrap().step_index, 1);
}
#[test]
fn resume_from_checkpoint_sets_correct_step_index() {
let cp = Checkpoint::new(2, 0xDEAD_BEEF);
let mut runner = DurableRunner::new();
runner.resume_from_checkpoint(&cp);
assert_eq!(runner.current_step(), 3);
}
#[test]
fn resumed_run_skips_already_completed_steps() {
let steps = vec![
click_step("step-1", "A"),
click_step("step-2", "B"),
click_step("step-3", "C"),
];
let cp = Checkpoint::new(1, 0);
let mut runner = DurableRunner::new();
runner.resume_from_checkpoint(&cp);
let mut exec = MockExecutor::always_ok();
let result = runner.run(&steps, &mut exec);
assert_eq!(
result,
WorkflowResult::Success {
steps_executed: 3,
total_retries: 0
}
);
assert_eq!(exec.received().len(), 1);
}
#[test]
fn reset_clears_state() {
let steps = three_step_workflow();
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::always_ok();
runner.run(&steps, &mut exec);
runner.reset();
assert_eq!(runner.current_step(), 0);
assert!(runner.checkpoints().is_empty());
}
#[test]
fn empty_workflow_succeeds_immediately() {
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::always_ok();
let result = runner.run(&[], &mut exec);
assert_eq!(
result,
WorkflowResult::Success {
steps_executed: 0,
total_retries: 0
}
);
}
#[test]
fn mock_executor_records_received_actions() {
let steps = three_step_workflow();
let mut runner = DurableRunner::new();
let mut exec = MockExecutor::always_ok();
runner.run(&steps, &mut exec);
assert_eq!(exec.received().len(), 3);
assert_eq!(exec.received()[0], StepAction::Click("File".into()));
assert_eq!(exec.received()[1], StepAction::Checkpoint);
assert_eq!(exec.received()[2], StepAction::Click("Save".into()));
}
}