bronzeflow_core/task/
builder.rs

1use crate::runtime::RunnableMetadata;
2use crate::task::dag::{DepTaskNode, TaskNode, DAG};
3use crate::task::TryIntoTask;
4use bronzeflow_utils::Result;
5use std::borrow::BorrowMut;
6use std::sync::{Arc, Mutex};
7
8#[derive(Default)]
9pub struct DAGBuilder {
10    curr_node: Option<DepTaskNode>,
11    roots: Vec<DepTaskNode>,
12}
13
14impl DAGBuilder {
15    pub fn new() -> DAGBuilder {
16        DAGBuilder {
17            curr_node: None,
18            roots: vec![],
19        }
20    }
21
22    /// Add a task to DAG as dag node
23    ///
24    pub fn task<M, T>(mut self, meta: M, task: T) -> Self
25    where
26        M: Into<RunnableMetadata>,
27        T: TryIntoTask,
28    {
29        if let Some(node) = self.curr_node {
30            self.roots.push(node);
31        }
32        let new_node = Some(Arc::new(Mutex::new(TaskNode::with_meta(
33            meta,
34            task.try_into_task(),
35        ))));
36        self.curr_node = new_node;
37        self
38    }
39
40    /// Set name for current task
41    pub fn set_name(mut self, name: &str) -> Self {
42        let node = self.curr_node.take();
43        if let Some(node) = node {
44            node.as_ref()
45                .lock()
46                .unwrap()
47                .borrow_mut()
48                .meta
49                .as_mut()
50                .map(|r| r.set_name(name.to_string()));
51            self.curr_node = Some(node);
52        }
53        self
54    }
55
56    /// Add a parent task to current task
57    pub fn parent<F>(mut self, mut bf: F) -> Self
58    where
59        F: FnMut(Self) -> Self,
60    {
61        let parents = bf(DAGBuilder::new()).build_vec();
62
63        if let Some(ref mut node) = self.curr_node {
64            for p in parents {
65                p.as_ref().lock().unwrap().children.push(node.clone());
66                node.as_ref().lock().unwrap().parents.push(p.clone());
67            }
68        }
69        self
70    }
71
72    /// Add a child task to current task
73    pub fn child<F>(mut self, mut bf: F) -> Self
74    where
75        F: FnMut(Self) -> Self,
76    {
77        let children = bf(DAGBuilder::new()).build_vec();
78
79        if let Some(ref mut node) = self.curr_node {
80            for c in children {
81                c.as_ref().lock().unwrap().parents.push(node.clone());
82                node.as_ref().lock().unwrap().children.push(c.clone());
83            }
84        }
85        self
86    }
87
88    pub fn merge(mut self, mut other: DAGBuilder) -> Self {
89        if self.curr_node.is_none() {
90            self.curr_node = other.curr_node.take();
91        }
92        self.roots.append(&mut other.build_vec());
93        self
94    }
95
96    fn build_vec(mut self) -> Vec<DepTaskNode> {
97        if let Some(node) = self.curr_node {
98            self.roots.push(node);
99        }
100        self.roots
101    }
102
103    pub fn build(self) -> Result<DAG> {
104        let vec = self.build_vec();
105        let mut roots = vec![];
106        let mut f = |node: DepTaskNode| {
107            roots.push(node);
108        };
109        DAG::handle_top_node(&vec, &mut f);
110        Ok(DAG::new(roots))
111    }
112}
113
114impl<F: TryIntoTask> From<F> for DAGBuilder {
115    fn from(task: F) -> Self {
116        DAGBuilder::new().task("", task)
117    }
118}
119
120#[macro_export]
121macro_rules! dag {
122    ($dag_builder:stmt) => {{
123        {
124            $dag_builder
125        }
126    }};
127
128    ($task_name:expr => $task:expr => $parent:expr) => {
129        {
130            let tmp_dag = dag! {$task_name => $task};
131            tmp_dag.parent(|_| {
132                dag!($parent)
133            })
134        }
135    };
136
137    ( $( $task_name:expr => $task:expr ), + $(,)?) => {
138        {
139            let mut builder = $crate::task::builder::DAGBuilder::new();
140            $(
141                let task = dag!{$task};
142                let tmp_builder = $crate::task::builder::DAGBuilder::from(task).set_name($task_name);
143                builder = builder.merge(tmp_builder);
144            )+
145            builder
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::prelude::{AsyncFn, SyncFn};
154
155    #[test]
156    fn create_dag() {
157        let _ = DAGBuilder::new()
158            .task("T1", || {
159                println!("I am a task");
160            })
161            .build()
162            .unwrap();
163        let _ = DAGBuilder::new()
164            .task(
165                "T2",
166                AsyncFn(|| async {
167                    println!("I am a asynchronous task");
168                }),
169            )
170            .build()
171            .unwrap();
172
173        let _ = DAGBuilder::from(SyncFn(|| {
174            println!("Task");
175        }));
176    }
177
178    #[test]
179    fn create_dag_with_parent_and_child() {
180        let d = DAGBuilder::new()
181            .task("Task C", || println!("Task C"))
182            .parent(|bd: DAGBuilder| {
183                bd.task("Task D", || println!("Task D"))
184                    .task("Task D1", || println!("Task D1"))
185                    .parent(|bd2| {
186                        bd2.task("Task E1", || {})
187                            .task("Task E2", || {})
188                            .task("Task E3", || {})
189                    })
190            })
191            .child(|bd: DAGBuilder| {
192                bd.task("Task C1", || println!("Task C1"))
193                    .task("Task C2", || println!("Task C2"))
194                    .child(|bd2| bd2.task("Task B1", || {}).task("Task B2", || {}))
195            })
196            .build()
197            .unwrap();
198        d.print_tree();
199    }
200
201    #[test]
202    fn test_macro_create_dag() {
203        let d1 = dag!("first" => || {println!("Task first")})
204            .build()
205            .unwrap();
206        let d2 = dag!(
207            "hello" => || println!("Task A"),
208            "name" => || println!("Task A"),
209            "fn tas" => ||{println!("Im function task")}
210        )
211        .build()
212        .unwrap();
213        d1.print_tree();
214        d2.print_tree();
215    }
216
217    #[test]
218    fn test_macro_single_parent() {
219        let d = dag!(
220            "P1A" => ||println!("P1-A") => dag!(
221                "P1A1" => ||println!("P1-A1")
222            )
223        )
224        .build()
225        .unwrap();
226        d.print_tree();
227    }
228
229    #[test]
230    fn test_macro_multi_parents() {
231        let d = dag!(
232            "A" => || println!("Task A") => dag!(
233                 "A1" => || println!("Task A1"),
234                 "A2" => || println!("Task A2"),
235                "C1" => dag!(
236                    "C1" => || println!("Task C1") => dag!(
237                        "C11" => || println!("Task C11")
238                    )
239                )
240            )
241        )
242        .build()
243        .unwrap();
244        d.print_tree();
245    }
246}