dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Shared helpers for the test suites.

#![allow(dead_code)]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use dag_executor::context::Context;
use dag_executor::prelude::*;

/// A task that always succeeds, returning its id as output.
pub fn ok_task(id: &str, deps: &[&str]) -> Arc<dyn Task> {
    let label = id.to_string();
    Arc::new(
        BasicTask::new(id, move |_ctx: Arc<Context>| {
            let label = label.clone();
            async move { Ok(serde_json::json!(label)) }
        })
        .with_deps(deps.iter().map(|s| s.to_string())),
    )
}

/// A task that always fails.
pub fn failing_task(id: &str, deps: &[&str]) -> Arc<dyn Task> {
    Arc::new(
        BasicTask::new(id, |_ctx: Arc<Context>| async {
            Err(TaskError::execution("boom"))
        })
        .with_deps(deps.iter().map(|s| s.to_string())),
    )
}

/// A task that counts how many times it ran (via the returned counter) and
/// fails the first `fail_until` attempts before succeeding.
pub fn flaky_task(id: &str, fail_until: usize) -> (Arc<dyn Task>, Arc<AtomicUsize>) {
    let counter = Arc::new(AtomicUsize::new(0));
    let c = counter.clone();
    let task = BasicTask::new(id, move |_ctx: Arc<Context>| {
        let c = c.clone();
        async move {
            let n = c.fetch_add(1, Ordering::SeqCst);
            if n < fail_until {
                Err(TaskError::execution("flaky"))
            } else {
                Ok(serde_json::json!("ok"))
            }
        }
    });
    (Arc::new(task), counter)
}

/// A task that records each execution in the shared counter and succeeds.
pub fn counting_task(id: &str, deps: &[&str], counter: Arc<AtomicUsize>) -> Arc<dyn Task> {
    Arc::new(
        BasicTask::new(id, move |_ctx: Arc<Context>| {
            let counter = counter.clone();
            async move {
                counter.fetch_add(1, Ordering::SeqCst);
                Ok(serde_json::json!("ran"))
            }
        })
        .with_deps(deps.iter().map(|s| s.to_string())),
    )
}