bronzeflow_core/task/
builder.rs1use crate::runtime::RunnableMetadata;
2use crate::task::dag::{DepTaskNode, TaskNode, DAG};
3use crate::task::TryIntoTask;
4use bronzeflow_utils::Result;
5use std::borrow::BorrowMut;
6use std::sync::{Arc, Mutex};
7
8#[derive(Default)]
9pub struct DAGBuilder {
10 curr_node: Option<DepTaskNode>,
11 roots: Vec<DepTaskNode>,
12}
13
14impl DAGBuilder {
15 pub fn new() -> DAGBuilder {
16 DAGBuilder {
17 curr_node: None,
18 roots: vec![],
19 }
20 }
21
22 pub fn task<M, T>(mut self, meta: M, task: T) -> Self
25 where
26 M: Into<RunnableMetadata>,
27 T: TryIntoTask,
28 {
29 if let Some(node) = self.curr_node {
30 self.roots.push(node);
31 }
32 let new_node = Some(Arc::new(Mutex::new(TaskNode::with_meta(
33 meta,
34 task.try_into_task(),
35 ))));
36 self.curr_node = new_node;
37 self
38 }
39
40 pub fn set_name(mut self, name: &str) -> Self {
42 let node = self.curr_node.take();
43 if let Some(node) = node {
44 node.as_ref()
45 .lock()
46 .unwrap()
47 .borrow_mut()
48 .meta
49 .as_mut()
50 .map(|r| r.set_name(name.to_string()));
51 self.curr_node = Some(node);
52 }
53 self
54 }
55
56 pub fn parent<F>(mut self, mut bf: F) -> Self
58 where
59 F: FnMut(Self) -> Self,
60 {
61 let parents = bf(DAGBuilder::new()).build_vec();
62
63 if let Some(ref mut node) = self.curr_node {
64 for p in parents {
65 p.as_ref().lock().unwrap().children.push(node.clone());
66 node.as_ref().lock().unwrap().parents.push(p.clone());
67 }
68 }
69 self
70 }
71
72 pub fn child<F>(mut self, mut bf: F) -> Self
74 where
75 F: FnMut(Self) -> Self,
76 {
77 let children = bf(DAGBuilder::new()).build_vec();
78
79 if let Some(ref mut node) = self.curr_node {
80 for c in children {
81 c.as_ref().lock().unwrap().parents.push(node.clone());
82 node.as_ref().lock().unwrap().children.push(c.clone());
83 }
84 }
85 self
86 }
87
88 pub fn merge(mut self, mut other: DAGBuilder) -> Self {
89 if self.curr_node.is_none() {
90 self.curr_node = other.curr_node.take();
91 }
92 self.roots.append(&mut other.build_vec());
93 self
94 }
95
96 fn build_vec(mut self) -> Vec<DepTaskNode> {
97 if let Some(node) = self.curr_node {
98 self.roots.push(node);
99 }
100 self.roots
101 }
102
103 pub fn build(self) -> Result<DAG> {
104 let vec = self.build_vec();
105 let mut roots = vec![];
106 let mut f = |node: DepTaskNode| {
107 roots.push(node);
108 };
109 DAG::handle_top_node(&vec, &mut f);
110 Ok(DAG::new(roots))
111 }
112}
113
114impl<F: TryIntoTask> From<F> for DAGBuilder {
115 fn from(task: F) -> Self {
116 DAGBuilder::new().task("", task)
117 }
118}
119
120#[macro_export]
121macro_rules! dag {
122 ($dag_builder:stmt) => {{
123 {
124 $dag_builder
125 }
126 }};
127
128 ($task_name:expr => $task:expr => $parent:expr) => {
129 {
130 let tmp_dag = dag! {$task_name => $task};
131 tmp_dag.parent(|_| {
132 dag!($parent)
133 })
134 }
135 };
136
137 ( $( $task_name:expr => $task:expr ), + $(,)?) => {
138 {
139 let mut builder = $crate::task::builder::DAGBuilder::new();
140 $(
141 let task = dag!{$task};
142 let tmp_builder = $crate::task::builder::DAGBuilder::from(task).set_name($task_name);
143 builder = builder.merge(tmp_builder);
144 )+
145 builder
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::prelude::{AsyncFn, SyncFn};
154
155 #[test]
156 fn create_dag() {
157 let _ = DAGBuilder::new()
158 .task("T1", || {
159 println!("I am a task");
160 })
161 .build()
162 .unwrap();
163 let _ = DAGBuilder::new()
164 .task(
165 "T2",
166 AsyncFn(|| async {
167 println!("I am a asynchronous task");
168 }),
169 )
170 .build()
171 .unwrap();
172
173 let _ = DAGBuilder::from(SyncFn(|| {
174 println!("Task");
175 }));
176 }
177
178 #[test]
179 fn create_dag_with_parent_and_child() {
180 let d = DAGBuilder::new()
181 .task("Task C", || println!("Task C"))
182 .parent(|bd: DAGBuilder| {
183 bd.task("Task D", || println!("Task D"))
184 .task("Task D1", || println!("Task D1"))
185 .parent(|bd2| {
186 bd2.task("Task E1", || {})
187 .task("Task E2", || {})
188 .task("Task E3", || {})
189 })
190 })
191 .child(|bd: DAGBuilder| {
192 bd.task("Task C1", || println!("Task C1"))
193 .task("Task C2", || println!("Task C2"))
194 .child(|bd2| bd2.task("Task B1", || {}).task("Task B2", || {}))
195 })
196 .build()
197 .unwrap();
198 d.print_tree();
199 }
200
201 #[test]
202 fn test_macro_create_dag() {
203 let d1 = dag!("first" => || {println!("Task first")})
204 .build()
205 .unwrap();
206 let d2 = dag!(
207 "hello" => || println!("Task A"),
208 "name" => || println!("Task A"),
209 "fn tas" => ||{println!("Im function task")}
210 )
211 .build()
212 .unwrap();
213 d1.print_tree();
214 d2.print_tree();
215 }
216
217 #[test]
218 fn test_macro_single_parent() {
219 let d = dag!(
220 "P1A" => ||println!("P1-A") => dag!(
221 "P1A1" => ||println!("P1-A1")
222 )
223 )
224 .build()
225 .unwrap();
226 d.print_tree();
227 }
228
229 #[test]
230 fn test_macro_multi_parents() {
231 let d = dag!(
232 "A" => || println!("Task A") => dag!(
233 "A1" => || println!("Task A1"),
234 "A2" => || println!("Task A2"),
235 "C1" => dag!(
236 "C1" => || println!("Task C1") => dag!(
237 "C11" => || println!("Task C11")
238 )
239 )
240 )
241 )
242 .build()
243 .unwrap();
244 d.print_tree();
245 }
246}