dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A task that evaluates a predicate and records which branch was taken.

use crate::context::Context;
use crate::error::TaskError;
use crate::tasks::r#trait::{Task, TaskOutput};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;

type PredicateFn =
    Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<bool, TaskError>> + Send + Sync>;

/// A branching task.
///
/// It evaluates an async predicate and records the chosen branch label on the
/// context blackboard under the key `{id}.branch`. Downstream tasks can read
/// that value to decide whether to do real work or short-circuit, implementing
/// conditional control flow within an otherwise static DAG.
///
/// The task's own output is `{"taken": <bool>, "branch": <label>}`.
pub struct ConditionalTask {
    id: String,
    deps: Vec<String>,
    priority: u8,
    predicate: PredicateFn,
    on_true: String,
    on_false: String,
}

impl ConditionalTask {
    /// Create a conditional with the given async predicate.
    ///
    /// `on_true`/`on_false` are the branch labels published to the blackboard.
    pub fn new<F, Fut>(
        id: impl Into<String>,
        on_true: impl Into<String>,
        on_false: impl Into<String>,
        predicate: F,
    ) -> Self
    where
        F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<bool, TaskError>> + Send + 'static,
    {
        ConditionalTask {
            id: id.into(),
            deps: Vec::new(),
            priority: 0,
            predicate: Box::new(move |ctx| predicate(ctx).boxed()),
            on_true: on_true.into(),
            on_false: on_false.into(),
        }
    }

    /// Declare dependencies.
    pub fn with_deps<I, S>(mut self, deps: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.deps = deps.into_iter().map(Into::into).collect();
        self
    }

    /// Set scheduling priority.
    pub fn with_priority(mut self, priority: u8) -> Self {
        self.priority = priority;
        self
    }
}

#[async_trait]
impl Task for ConditionalTask {
    fn id(&self) -> &str {
        &self.id
    }

    fn dependencies(&self) -> Vec<String> {
        self.deps.clone()
    }

    fn priority(&self) -> u8 {
        self.priority
    }

    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
        let taken = (self.predicate)(ctx.clone()).await?;
        let branch = if taken { &self.on_true } else { &self.on_false };
        ctx.set(
            format!("{}.branch", self.id),
            serde_json::Value::String(branch.clone()),
        );
        Ok(serde_json::json!({ "taken": taken, "branch": branch }))
    }
}