Skip to main content

dag_executor/tasks/
basic.rs

1//! A simple closure-backed task.
2
3use crate::context::Context;
4use crate::error::TaskError;
5use crate::tasks::r#trait::{Task, TaskOutput};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use futures::FutureExt;
9use std::future::Future;
10use std::sync::Arc;
11
12type TaskFn =
13    Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;
14
15/// A task whose body is an async closure.
16///
17/// This is the workhorse for ad-hoc workflows and tests:
18///
19/// ```
20/// use dag_executor::tasks::BasicTask;
21/// let t = BasicTask::new("greet", |_ctx| async { Ok(serde_json::json!("hi")) })
22///     .with_deps(["setup"])
23///     .with_priority(10);
24/// ```
25pub struct BasicTask {
26    id: String,
27    deps: Vec<String>,
28    priority: u8,
29    func: TaskFn,
30}
31
32impl BasicTask {
33    /// Create a task with id `id` running async closure `f`.
34    pub fn new<F, Fut>(id: impl Into<String>, f: F) -> Self
35    where
36        F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
37        Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
38    {
39        BasicTask {
40            id: id.into(),
41            deps: Vec::new(),
42            priority: 0,
43            func: Box::new(move |ctx| f(ctx).boxed()),
44        }
45    }
46
47    /// Declare the tasks this one depends on.
48    pub fn with_deps<I, S>(mut self, deps: I) -> Self
49    where
50        I: IntoIterator<Item = S>,
51        S: Into<String>,
52    {
53        self.deps = deps.into_iter().map(Into::into).collect();
54        self
55    }
56
57    /// Set the scheduling priority (higher runs first).
58    pub fn with_priority(mut self, priority: u8) -> Self {
59        self.priority = priority;
60        self
61    }
62}
63
64#[async_trait]
65impl Task for BasicTask {
66    fn id(&self) -> &str {
67        &self.id
68    }
69
70    fn dependencies(&self) -> Vec<String> {
71        self.deps.clone()
72    }
73
74    fn priority(&self) -> u8 {
75        self.priority
76    }
77
78    async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
79        (self.func)(ctx).await
80    }
81}