bronzeflow_core/task/
dag.rs1use crate::prelude::{RuntimeJoinHandle, SyncFn};
2use crate::runtime::{
3 BuildFromRunnable, Runnable, RunnableMetadata, RunnableMetadataBuilder, SafeMetadata,
4};
5use crate::task::{TaskInfo, TryIntoTask, WrappedTask};
6use bronzeflow_time::prelude::ScheduleExpr;
7use bronzeflow_time::schedule_time::ScheduleTimeHolder;
8use bronzeflow_utils::{BronzeError, Result};
9use std::sync::{Arc, Mutex};
10
11pub type DepTaskNode = Arc<Mutex<TaskNode>>;
12pub type TimeHoldType = Arc<Mutex<ScheduleTimeHolder>>;
13
14pub struct TaskNode {
16 pub(crate) task: TaskInfo,
17 pub(crate) meta: Option<RunnableMetadata>,
18 pub(crate) parents: Vec<DepTaskNode>,
19 pub(crate) children: Vec<DepTaskNode>,
20}
21
22#[derive(Clone)]
23pub struct DAG {
24 root_tasks: Vec<DepTaskNode>,
26 schedule: Option<ScheduleExpr>,
27 pub(crate) meta: Option<SafeMetadata>,
28}
29
30impl<T: TryIntoTask> From<T> for DAG {
31 fn from(value: T) -> Self {
32 let new_node = Arc::new(Mutex::new(TaskNode::new(value.try_into_task())));
33 DAG::new(vec![new_node])
34 }
35}
36
37impl BuildFromRunnable for DAG {
38 type Type = DAG;
39 fn build_from(runnable: impl Runnable<Handle = RuntimeJoinHandle<()>> + Send + 'static) -> DAG {
40 let task = TaskInfo::build_from(runnable);
41 let new_node = Arc::new(Mutex::new(TaskNode::new(task)));
42 DAG::new(vec![new_node])
43 }
44}
45
46impl TaskNode {
47 pub fn new(task: TaskInfo) -> Self {
48 TaskNode {
49 task,
50 meta: Some(RunnableMetadata::default()),
51 parents: vec![],
52 children: vec![],
53 }
54 }
55
56 pub fn run(&mut self) {
57 self.task.0.as_ref().lock().unwrap().0.run();
58 }
59
60 pub fn with_meta<T>(meta: T, task: TaskInfo) -> Self
61 where
62 T: Into<RunnableMetadata>,
63 {
64 TaskNode {
65 task,
66 meta: Some(meta.into()),
67 parents: vec![],
68 children: vec![],
69 }
70 }
71}
72
73unsafe impl Send for TaskNode {}
74
75impl DAG {
76 pub fn new(root_tasks: Vec<DepTaskNode>) -> Self {
77 DAG {
78 root_tasks,
79 schedule: None,
80 meta: None,
82 }
83 }
84
85 pub fn set_schedule(&mut self, schedule: ScheduleExpr) {
86 self.schedule = Some(schedule);
87 }
88
89 pub fn handle_top_node<F>(nodes: &Vec<DepTaskNode>, f: &mut F)
90 where
91 F: FnMut(DepTaskNode),
92 {
93 for node in nodes {
94 let parents = &node.as_ref().lock().unwrap().parents;
95 if !parents.is_empty() {
96 DAG::handle_top_node(parents, f);
97 } else {
98 f(node.clone());
99 }
100 }
101 }
102
103 pub fn prepare(&mut self) {
104 let mut time_holder = ScheduleTimeHolder::new(self.schedule.take().unwrap());
105 time_holder.init();
106 self.meta = Some(Arc::new(Mutex::new(
107 RunnableMetadataBuilder::default()
108 .id(None)
109 .name(None)
110 .maximum_run_times(None)
111 .maximum_parallelism(None)
112 .schedule(Some(time_holder))
113 .build()
114 .unwrap(),
115 )));
116 }
117
118 pub fn run(&mut self) {
119 self.run_task(|t| {
120 t.as_ref().lock().unwrap().run();
121 })
122 }
123
124 pub fn run_task<F>(&self, runner: F)
125 where
126 F: Fn(DepTaskNode),
127 {
128 for t in &self.root_tasks {
129 DAG::handle(false, t.clone(), |task| {
130 runner(task)
132 })
133 }
134 }
135
136 pub fn print_tree(&self) {
137 let mut s = vec![];
138 let f = |node: DepTaskNode, level| {
139 for _ in 0..level {
140 s.push(String::from(" "));
141 }
142 s.push(
143 node.as_ref()
144 .lock()
145 .unwrap()
146 .meta
147 .as_ref()
148 .unwrap()
149 .name
150 .as_ref()
151 .map_or_else(|| "".to_string(), |r| r.to_string()),
152 );
153 s.push("\n".to_string());
154 };
155 self.handle_with_level(f);
156 let tree_str = s.join("");
157 println!("{}", tree_str);
158 }
159
160 pub fn print_in_one_tree(&self) {
162 let mut vnode = TaskNode::new(TryIntoTask::try_into_task(SyncFn(|| println!("root"))));
163 for node in &self.root_tasks {
164 vnode.children.push(node.clone());
165 }
166 }
167
168 pub fn handle<F>(parent: bool, node: DepTaskNode, mut f: F)
169 where
170 F: FnMut(DepTaskNode),
171 {
172 f(node.clone());
173 let task = node.as_ref().lock().unwrap();
174 let list = if parent {
175 &task.parents
176 } else {
177 &task.children
178 };
179 for l in list {
180 f(l.clone());
181 }
182 }
183
184 pub fn handle_with_level<F>(&self, mut f: F)
185 where
186 F: FnMut(DepTaskNode, usize),
187 {
188 for node in &self.root_tasks {
189 DAG::handle_one_with_level(node.clone(), 0, &mut f);
190 }
191 }
192
193 pub(crate) fn handle_one_with_level<F>(node: DepTaskNode, level: usize, f: &mut F)
194 where
195 F: FnMut(DepTaskNode, usize),
196 {
197 f(node.clone(), level);
198 let task = node.as_ref().lock().unwrap();
199 let level = level + 1;
200 for t in &task.children {
201 DAG::handle_one_with_level(t.clone(), level, f)
202 }
203 }
204
205 pub fn cal_task_nums(&self) -> usize {
206 let mut v = vec![];
208 self.handle_with_level(|t, _| v.push(t));
209 v.len()
210 }
211
212 pub fn to_single_task(mut self) -> Result<WrappedTask> {
213 let t =
214 self.root_tasks.into_iter().next().ok_or_else(|| {
215 BronzeError::msg("DAG is empty, could not transform to single task")
216 })?;
217 let meta = self.meta.take();
218 let lock = Arc::try_unwrap(t)
219 .ok()
220 .ok_or_else(|| BronzeError::msg("Could take ownership from task point in DAG"))?;
221 let task_node = lock
222 .into_inner()
223 .ok()
224 .ok_or_else(|| BronzeError::msg("Could take ownership from locked task in DAG"))?;
225 Ok(WrappedTask::new(task_node.task, meta))
226 }
227}