szal 0.26.3

Workflow engine — step/flow execution with branching, retry, rollback, and parallel stages
Documentation
use crate::step::{StepDef, StepResult, StepStatus};

use super::StepHandler;

pub(crate) async fn execute_step_with_handler(step: &StepDef, handler: &StepHandler) -> StepResult {
    let max_attempts = step.max_retries + 1;
    let mut last_error = None;
    let total_start = std::time::Instant::now();

    tracing::debug!(step = %step.name, attempt = 1, "starting step execution");

    for attempt in 1..=max_attempts {
        let step_start = std::time::Instant::now();

        let fut = (handler)(step.clone());
        let result = if step.timeout_ms < u64::MAX {
            match tokio::time::timeout(std::time::Duration::from_millis(step.timeout_ms), fut).await
            {
                Ok(r) => r,
                Err(_) => {
                    tracing::warn!(step = %step.name, timeout_ms = step.timeout_ms, "step timed out");
                    Err(format!("timeout after {}ms", step.timeout_ms))
                }
            }
        } else {
            fut.await
        };

        let duration_ms = step_start.elapsed().as_millis() as u64;

        match result {
            Ok(output) => {
                tracing::debug!(
                    step = %step.name,
                    duration_ms,
                    attempts = attempt,
                    "step completed successfully"
                );
                return StepResult {
                    step_id: step.id,
                    status: StepStatus::Completed,
                    output,
                    duration_ms,
                    attempts: attempt,
                    error: None,
                };
            }
            Err(e) => {
                if attempt < max_attempts {
                    tracing::warn!(
                        step = %step.name,
                        attempt,
                        error = %e,
                        "step failed, retrying"
                    );
                }
                last_error = Some(e);
                if attempt < max_attempts {
                    tokio::time::sleep(std::time::Duration::from_millis(step.retry_delay_ms)).await;
                }
            }
        }
    }

    tracing::error!(
        step = %step.name,
        attempts = max_attempts,
        "step failed after all retries exhausted"
    );

    StepResult {
        step_id: step.id,
        status: StepStatus::Failed,
        output: serde_json::json!(null),
        duration_ms: total_start.elapsed().as_millis() as u64,
        attempts: max_attempts,
        error: last_error,
    }
}