dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A simple closure-backed task.

use crate::context::Context;
use crate::error::TaskError;
use crate::tasks::r#trait::{Task, TaskOutput};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;

type TaskFn =
    Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;

/// A task whose body is an async closure.
///
/// This is the workhorse for ad-hoc workflows and tests:
///
/// ```
/// use dag_executor::tasks::BasicTask;
/// let t = BasicTask::new("greet", |_ctx| async { Ok(serde_json::json!("hi")) })
///     .with_deps(["setup"])
///     .with_priority(10);
/// ```
pub struct BasicTask {
    id: String,
    deps: Vec<String>,
    priority: u8,
    func: TaskFn,
}

impl BasicTask {
    /// Create a task with id `id` running async closure `f`.
    pub fn new<F, Fut>(id: impl Into<String>, f: F) -> Self
    where
        F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
    {
        BasicTask {
            id: id.into(),
            deps: Vec::new(),
            priority: 0,
            func: Box::new(move |ctx| f(ctx).boxed()),
        }
    }

    /// Declare the tasks this one depends on.
    pub fn with_deps<I, S>(mut self, deps: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.deps = deps.into_iter().map(Into::into).collect();
        self
    }

    /// Set the scheduling priority (higher runs first).
    pub fn with_priority(mut self, priority: u8) -> Self {
        self.priority = priority;
        self
    }
}

#[async_trait]
impl Task for BasicTask {
    fn id(&self) -> &str {
        &self.id
    }

    fn dependencies(&self) -> Vec<String> {
        self.deps.clone()
    }

    fn priority(&self) -> u8 {
        self.priority
    }

    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
        (self.func)(ctx).await
    }
}