dag_executor/tasks/
event_driven.rs1use crate::context::Context;
4use crate::error::TaskError;
5use crate::tasks::r#trait::{Task, TaskOutput};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use futures::FutureExt;
9use std::future::Future;
10use std::sync::Arc;
11use std::time::Duration;
12
13type HandlerFn =
14 Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;
15
16pub struct EventDrivenTask {
23 id: String,
24 deps: Vec<String>,
25 priority: u8,
26 event: String,
27 timeout: Option<Duration>,
28 handler: HandlerFn,
29}
30
31impl EventDrivenTask {
32 pub fn new<F, Fut>(id: impl Into<String>, event: impl Into<String>, handler: F) -> Self
34 where
35 F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
36 Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
37 {
38 EventDrivenTask {
39 id: id.into(),
40 deps: Vec::new(),
41 priority: 0,
42 event: event.into(),
43 timeout: None,
44 handler: Box::new(move |ctx| handler(ctx).boxed()),
45 }
46 }
47
48 pub fn with_timeout(mut self, timeout: Duration) -> Self {
50 self.timeout = Some(timeout);
51 self
52 }
53
54 pub fn with_deps<I, S>(mut self, deps: I) -> Self
56 where
57 I: IntoIterator<Item = S>,
58 S: Into<String>,
59 {
60 self.deps = deps.into_iter().map(Into::into).collect();
61 self
62 }
63
64 pub fn with_priority(mut self, priority: u8) -> Self {
66 self.priority = priority;
67 self
68 }
69}
70
71#[async_trait]
72impl Task for EventDrivenTask {
73 fn id(&self) -> &str {
74 &self.id
75 }
76
77 fn dependencies(&self) -> Vec<String> {
78 self.deps.clone()
79 }
80
81 fn priority(&self) -> u8 {
82 self.priority
83 }
84
85 async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
86 let fired = match self.timeout {
87 Some(timeout) => match tokio::time::timeout(timeout, ctx.wait_for(&self.event)).await {
88 Ok(fired) => fired,
89 Err(_) => return Err(TaskError::Timeout(timeout)),
90 },
91 None => ctx.wait_for(&self.event).await,
92 };
93
94 if !fired {
95 return Err(TaskError::Cancelled);
97 }
98
99 (self.handler)(ctx).await
100 }
101}