Skip to main content

dag_executor/tasks/
event_driven.rs

1//! A task that blocks until an external event fires, then runs a handler.
2
3use crate::context::Context;
4use crate::error::TaskError;
5use crate::tasks::r#trait::{Task, TaskOutput};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use futures::FutureExt;
9use std::future::Future;
10use std::sync::Arc;
11use std::time::Duration;
12
13type HandlerFn =
14    Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;
15
16/// A task that waits for a named event on the [`Context`] event bus before
17/// running its handler.
18///
19/// Events are emitted via [`Context::emit`]. This enables event-driven
20/// workflows where some tasks block until an external signal (a webhook, a
21/// human approval, another task) arrives. An optional timeout bounds the wait.
22pub struct EventDrivenTask {
23    id: String,
24    deps: Vec<String>,
25    priority: u8,
26    event: String,
27    timeout: Option<Duration>,
28    handler: HandlerFn,
29}
30
31impl EventDrivenTask {
32    /// Create a task that waits for `event`, then runs `handler`.
33    pub fn new<F, Fut>(id: impl Into<String>, event: impl Into<String>, handler: F) -> Self
34    where
35        F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
36        Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
37    {
38        EventDrivenTask {
39            id: id.into(),
40            deps: Vec::new(),
41            priority: 0,
42            event: event.into(),
43            timeout: None,
44            handler: Box::new(move |ctx| handler(ctx).boxed()),
45        }
46    }
47
48    /// Bound the wait for the event; on expiry the task fails with a timeout.
49    pub fn with_timeout(mut self, timeout: Duration) -> Self {
50        self.timeout = Some(timeout);
51        self
52    }
53
54    /// Declare dependencies.
55    pub fn with_deps<I, S>(mut self, deps: I) -> Self
56    where
57        I: IntoIterator<Item = S>,
58        S: Into<String>,
59    {
60        self.deps = deps.into_iter().map(Into::into).collect();
61        self
62    }
63
64    /// Set scheduling priority.
65    pub fn with_priority(mut self, priority: u8) -> Self {
66        self.priority = priority;
67        self
68    }
69}
70
71#[async_trait]
72impl Task for EventDrivenTask {
73    fn id(&self) -> &str {
74        &self.id
75    }
76
77    fn dependencies(&self) -> Vec<String> {
78        self.deps.clone()
79    }
80
81    fn priority(&self) -> u8 {
82        self.priority
83    }
84
85    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
86        let fired = match self.timeout {
87            Some(timeout) => match tokio::time::timeout(timeout, ctx.wait_for(&self.event)).await {
88                Ok(fired) => fired,
89                Err(_) => return Err(TaskError::Timeout(timeout)),
90            },
91            None => ctx.wait_for(&self.event).await,
92        };
93
94        if !fired {
95            // wait_for returned false => the run was cancelled.
96            return Err(TaskError::Cancelled);
97        }
98
99        (self.handler)(ctx).await
100    }
101}