szal 0.26.3

Workflow engine — step/flow execution with branching, retry, rollback, and parallel stages
Documentation
use std::sync::Arc;

use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;

use crate::step::{StepDef, StepResult, StepStatus};

use super::StepHandler;
use super::step_exec::execute_step_with_handler;

pub(crate) async fn run_parallel(
    steps: &[StepDef],
    handler: &StepHandler,
    max_concurrency: usize,
    timeout_ms: u64,
    start: std::time::Instant,
    token: Option<&CancellationToken>,
) -> Vec<StepResult> {
    tracing::debug!(steps = steps.len(), "running parallel execution");
    let sem = Arc::new(Semaphore::new(max_concurrency.max(1)));
    let mut handles = Vec::with_capacity(steps.len());

    let mut step_ids = Vec::with_capacity(steps.len());
    for step in steps {
        step_ids.push(step.id);
        let sem = sem.clone();
        let handler = handler.clone();
        let step = step.clone();
        handles.push(tokio::spawn(async move {
            let _permit = match sem.acquire().await {
                Ok(p) => p,
                Err(_) => {
                    return StepResult {
                        step_id: step.id,
                        status: StepStatus::Failed,
                        output: serde_json::json!(null),
                        duration_ms: 0,
                        attempts: 0,
                        error: Some("semaphore closed".into()),
                    };
                }
            };
            execute_step_with_handler(&step, &handler).await
        }));
    }

    let mut results = Vec::with_capacity(handles.len());
    for (handle, original_id) in handles.into_iter().zip(step_ids) {
        let cancelled = token.is_some_and(|t| t.is_cancelled());
        if cancelled || start.elapsed().as_millis() as u64 > timeout_ms {
            handle.abort();
            results.push(StepResult {
                step_id: original_id,
                status: StepStatus::Skipped,
                output: serde_json::json!(null),
                duration_ms: 0,
                attempts: 0,
                error: Some(if cancelled {
                    "cancelled".into()
                } else {
                    "flow timeout exceeded".into()
                }),
            });
            continue;
        }
        match handle.await {
            Ok(result) => results.push(result),
            Err(e) => {
                tracing::error!(step_id = %original_id, error = %e, "spawned task panicked");
                results.push(StepResult {
                    step_id: original_id,
                    status: StepStatus::Failed,
                    output: serde_json::json!(null),
                    duration_ms: 0,
                    attempts: 0,
                    error: Some(format!("task panicked: {e}")),
                });
            }
        }
    }
    results
}