dag_executor/tasks/
basic.rs1use crate::context::Context;
4use crate::error::TaskError;
5use crate::tasks::r#trait::{Task, TaskOutput};
6use async_trait::async_trait;
7use futures::future::BoxFuture;
8use futures::FutureExt;
9use std::future::Future;
10use std::sync::Arc;
11
12type TaskFn =
13 Box<dyn Fn(Arc<Context>) -> BoxFuture<'static, Result<TaskOutput, TaskError>> + Send + Sync>;
14
15pub struct BasicTask {
26 id: String,
27 deps: Vec<String>,
28 priority: u8,
29 func: TaskFn,
30}
31
32impl BasicTask {
33 pub fn new<F, Fut>(id: impl Into<String>, f: F) -> Self
35 where
36 F: Fn(Arc<Context>) -> Fut + Send + Sync + 'static,
37 Fut: Future<Output = Result<TaskOutput, TaskError>> + Send + 'static,
38 {
39 BasicTask {
40 id: id.into(),
41 deps: Vec::new(),
42 priority: 0,
43 func: Box::new(move |ctx| f(ctx).boxed()),
44 }
45 }
46
47 pub fn with_deps<I, S>(mut self, deps: I) -> Self
49 where
50 I: IntoIterator<Item = S>,
51 S: Into<String>,
52 {
53 self.deps = deps.into_iter().map(Into::into).collect();
54 self
55 }
56
57 pub fn with_priority(mut self, priority: u8) -> Self {
59 self.priority = priority;
60 self
61 }
62}
63
64#[async_trait]
65impl Task for BasicTask {
66 fn id(&self) -> &str {
67 &self.id
68 }
69
70 fn dependencies(&self) -> Vec<String> {
71 self.deps.clone()
72 }
73
74 fn priority(&self) -> u8 {
75 self.priority
76 }
77
78 async fn execute(&self, ctx: Arc<Context>) -> Result<TaskOutput, TaskError> {
79 (self.func)(ctx).await
80 }
81}