dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A task that repeats a body until a break condition or iteration cap.

use crate::context::Context;
use crate::error::TaskError;
use crate::tasks::r#trait::{Task, TaskOutput};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;

type BodyFn = Box<
    dyn Fn(Arc<Context>, u32) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync,
>;
type BreakFn = Box<dyn Fn(&TaskOutput) -> bool + Send + Sync>;

/// A task that runs `body` repeatedly until `break_when` returns true or the
/// iteration count reaches `max_iterations`.
///
/// The body receives the zero-based iteration index. The task output is
/// `{"iterations": <n>, "last": <last body output>, "broke_early": <bool>}`.
/// Cancellation is honored between iterations.
pub struct LoopTask {
    id: String,
    deps: Vec<String>,
    priority: u8,
    max_iterations: u32,
    body: BodyFn,
    break_when: BreakFn,
}

impl LoopTask {
    /// Create a loop task.
    pub fn new<F, Fut>(id: impl Into<String>, max_iterations: u32, body: F) -> Self
    where
        F: Fn(Arc<Context>, u32) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
    {
        LoopTask {
            id: id.into(),
            deps: Vec::new(),
            priority: 0,
            max_iterations,
            body: Box::new(move |ctx, i| body(ctx, i).boxed()),
            // By default, never break early — run all iterations.
            break_when: Box::new(|_| false),
        }
    }

    /// Set the early-exit predicate, evaluated against each body output.
    pub fn with_break<F>(mut self, break_when: F) -> Self
    where
        F: Fn(&TaskOutput) -> bool + Send + Sync + 'static,
    {
        self.break_when = Box::new(break_when);
        self
    }

    /// Declare dependencies.
    pub fn with_deps<I, S>(mut self, deps: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.deps = deps.into_iter().map(Into::into).collect();
        self
    }

    /// Set scheduling priority.
    pub fn with_priority(mut self, priority: u8) -> Self {
        self.priority = priority;
        self
    }
}

#[async_trait]
impl Task for LoopTask {
    fn id(&self) -> &str {
        &self.id
    }

    fn dependencies(&self) -> Vec<String> {
        self.deps.clone()
    }

    fn priority(&self) -> u8 {
        self.priority
    }

    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
        let mut last = serde_json::Value::Null;
        let mut iterations = 0u32;
        let mut broke_early = false;

        while iterations < self.max_iterations {
            if ctx.is_cancelled() {
                return Err(TaskError::Cancelled);
            }
            last = (self.body)(ctx.clone(), iterations).await?;
            iterations += 1;
            if (self.break_when)(&last) {
                broke_early = true;
                break;
            }
        }

        Ok(serde_json::json!({
            "iterations": iterations,
            "last": last,
            "broke_early": broke_early,
        }))
    }
}