bronzeflow_core/task/
dag.rs

1use crate::prelude::{RuntimeJoinHandle, SyncFn};
2use crate::runtime::{
3    BuildFromRunnable, Runnable, RunnableMetadata, RunnableMetadataBuilder, SafeMetadata,
4};
5use crate::task::{TaskInfo, TryIntoTask, WrappedTask};
6use bronzeflow_time::prelude::ScheduleExpr;
7use bronzeflow_time::schedule_time::ScheduleTimeHolder;
8use bronzeflow_utils::{BronzeError, Result};
9use std::sync::{Arc, Mutex};
10
11pub type DepTaskNode = Arc<Mutex<TaskNode>>;
12pub type TimeHoldType = Arc<Mutex<ScheduleTimeHolder>>;
13
14// #[derive(Debug)]
15pub struct TaskNode {
16    pub(crate) task: TaskInfo,
17    pub(crate) meta: Option<RunnableMetadata>,
18    pub(crate) parents: Vec<DepTaskNode>,
19    pub(crate) children: Vec<DepTaskNode>,
20}
21
22#[derive(Clone)]
23pub struct DAG {
24    // name: Option<String>,
25    root_tasks: Vec<DepTaskNode>,
26    schedule: Option<ScheduleExpr>,
27    pub(crate) meta: Option<SafeMetadata>,
28}
29
30impl<T: TryIntoTask> From<T> for DAG {
31    fn from(value: T) -> Self {
32        let new_node = Arc::new(Mutex::new(TaskNode::new(value.try_into_task())));
33        DAG::new(vec![new_node])
34    }
35}
36
37impl BuildFromRunnable for DAG {
38    type Type = DAG;
39    fn build_from(runnable: impl Runnable<Handle = RuntimeJoinHandle<()>> + Send + 'static) -> DAG {
40        let task = TaskInfo::build_from(runnable);
41        let new_node = Arc::new(Mutex::new(TaskNode::new(task)));
42        DAG::new(vec![new_node])
43    }
44}
45
46impl TaskNode {
47    pub fn new(task: TaskInfo) -> Self {
48        TaskNode {
49            task,
50            meta: Some(RunnableMetadata::default()),
51            parents: vec![],
52            children: vec![],
53        }
54    }
55
56    pub fn run(&mut self) {
57        self.task.0.as_ref().lock().unwrap().0.run();
58    }
59
60    pub fn with_meta<T>(meta: T, task: TaskInfo) -> Self
61    where
62        T: Into<RunnableMetadata>,
63    {
64        TaskNode {
65            task,
66            meta: Some(meta.into()),
67            parents: vec![],
68            children: vec![],
69        }
70    }
71}
72
73unsafe impl Send for TaskNode {}
74
75impl DAG {
76    pub fn new(root_tasks: Vec<DepTaskNode>) -> Self {
77        DAG {
78            root_tasks,
79            schedule: None,
80            // name: None,
81            meta: None,
82        }
83    }
84
85    pub fn set_schedule(&mut self, schedule: ScheduleExpr) {
86        self.schedule = Some(schedule);
87    }
88
89    pub fn handle_top_node<F>(nodes: &Vec<DepTaskNode>, f: &mut F)
90    where
91        F: FnMut(DepTaskNode),
92    {
93        for node in nodes {
94            let parents = &node.as_ref().lock().unwrap().parents;
95            if !parents.is_empty() {
96                DAG::handle_top_node(parents, f);
97            } else {
98                f(node.clone());
99            }
100        }
101    }
102
103    pub fn prepare(&mut self) {
104        let mut time_holder = ScheduleTimeHolder::new(self.schedule.take().unwrap());
105        time_holder.init();
106        self.meta = Some(Arc::new(Mutex::new(
107            RunnableMetadataBuilder::default()
108                .id(None)
109                .name(None)
110                .maximum_run_times(None)
111                .maximum_parallelism(None)
112                .schedule(Some(time_holder))
113                .build()
114                .unwrap(),
115        )));
116    }
117
118    pub fn run(&mut self) {
119        self.run_task(|t| {
120            t.as_ref().lock().unwrap().run();
121        })
122    }
123
124    pub fn run_task<F>(&self, runner: F)
125    where
126        F: Fn(DepTaskNode),
127    {
128        for t in &self.root_tasks {
129            DAG::handle(false, t.clone(), |task| {
130                // task.as_ref().lock().unwrap().run();
131                runner(task)
132            })
133        }
134    }
135
136    pub fn print_tree(&self) {
137        let mut s = vec![];
138        let f = |node: DepTaskNode, level| {
139            for _ in 0..level {
140                s.push(String::from("  "));
141            }
142            s.push(
143                node.as_ref()
144                    .lock()
145                    .unwrap()
146                    .meta
147                    .as_ref()
148                    .unwrap()
149                    .name
150                    .as_ref()
151                    .map_or_else(|| "".to_string(), |r| r.to_string()),
152            );
153            s.push("\n".to_string());
154        };
155        self.handle_with_level(f);
156        let tree_str = s.join("");
157        println!("{}", tree_str);
158    }
159
160    // TODO
161    pub fn print_in_one_tree(&self) {
162        let mut vnode = TaskNode::new(TryIntoTask::try_into_task(SyncFn(|| println!("root"))));
163        for node in &self.root_tasks {
164            vnode.children.push(node.clone());
165        }
166    }
167
168    pub fn handle<F>(parent: bool, node: DepTaskNode, mut f: F)
169    where
170        F: FnMut(DepTaskNode),
171    {
172        f(node.clone());
173        let task = node.as_ref().lock().unwrap();
174        let list = if parent {
175            &task.parents
176        } else {
177            &task.children
178        };
179        for l in list {
180            f(l.clone());
181        }
182    }
183
184    pub fn handle_with_level<F>(&self, mut f: F)
185    where
186        F: FnMut(DepTaskNode, usize),
187    {
188        for node in &self.root_tasks {
189            DAG::handle_one_with_level(node.clone(), 0, &mut f);
190        }
191    }
192
193    pub(crate) fn handle_one_with_level<F>(node: DepTaskNode, level: usize, f: &mut F)
194    where
195        F: FnMut(DepTaskNode, usize),
196    {
197        f(node.clone(), level);
198        let task = node.as_ref().lock().unwrap();
199        let level = level + 1;
200        for t in &task.children {
201            DAG::handle_one_with_level(t.clone(), level, f)
202        }
203    }
204
205    pub fn cal_task_nums(&self) -> usize {
206        // TODO use better method
207        let mut v = vec![];
208        self.handle_with_level(|t, _| v.push(t));
209        v.len()
210    }
211
212    pub fn to_single_task(mut self) -> Result<WrappedTask> {
213        let t =
214            self.root_tasks.into_iter().next().ok_or_else(|| {
215                BronzeError::msg("DAG is empty, could not transform to single task")
216            })?;
217        let meta = self.meta.take();
218        let lock = Arc::try_unwrap(t)
219            .ok()
220            .ok_or_else(|| BronzeError::msg("Could take ownership from task point in DAG"))?;
221        let task_node = lock
222            .into_inner()
223            .ok()
224            .ok_or_else(|| BronzeError::msg("Could take ownership from locked task in DAG"))?;
225        Ok(WrappedTask::new(task_node.task, meta))
226    }
227}