dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! A task that blocks until an external event fires, then runs a handler.

use crate::context::Context;
use crate::error::TaskError;
use crate::tasks::r#trait::{Task, TaskOutput};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

type HandlerFn =
    Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;

/// A task that waits for a named event on the [`Context`] event bus before
/// running its handler.
///
/// Events are emitted via [`Context::emit`]. This enables event-driven
/// workflows where some tasks block until an external signal (a webhook, a
/// human approval, another task) arrives. An optional timeout bounds the wait.
pub struct EventDrivenTask {
    id: String,
    deps: Vec<String>,
    priority: u8,
    event: String,
    timeout: Option<Duration>,
    handler: HandlerFn,
}

impl EventDrivenTask {
    /// Create a task that waits for `event`, then runs `handler`.
    pub fn new<F, Fut>(id: impl Into<String>, event: impl Into<String>, handler: F) -> Self
    where
        F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
    {
        EventDrivenTask {
            id: id.into(),
            deps: Vec::new(),
            priority: 0,
            event: event.into(),
            timeout: None,
            handler: Box::new(move |ctx| handler(ctx).boxed()),
        }
    }

    /// Bound the wait for the event; on expiry the task fails with a timeout.
    pub fn with_timeout(mut self, timeout: Duration) -> Self {
        self.timeout = Some(timeout);
        self
    }

    /// Declare dependencies.
    pub fn with_deps<I, S>(mut self, deps: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.deps = deps.into_iter().map(Into::into).collect();
        self
    }

    /// Set scheduling priority.
    pub fn with_priority(mut self, priority: u8) -> Self {
        self.priority = priority;
        self
    }
}

#[async_trait]
impl Task for EventDrivenTask {
    fn id(&self) -> &str {
        &self.id
    }

    fn dependencies(&self) -> Vec<String> {
        self.deps.clone()
    }

    fn priority(&self) -> u8 {
        self.priority
    }

    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
        let fired = match self.timeout {
            Some(timeout) => match tokio::time::timeout(timeout, ctx.wait_for(&self.event)).await {
                Ok(fired) => fired,
                Err(_) => return Err(TaskError::Timeout(timeout)),
            },
            None => ctx.wait_for(&self.event).await,
        };

        if !fired {
            // wait_for returned false => the run was cancelled.
            return Err(TaskError::Cancelled);
        }

        (self.handler)(ctx).await
    }
}