1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::sync::{Arc, Condvar, Mutex, RwLock};
5
6use console::style;
7use indicatif::{MultiProgress, ProgressDrawTarget};
8use threadpool;
9
10use crate::{Line, LineStatus, Task};
11use crate::ctx::CtxMgr;
12use crate::dep::{DepState, DepTree};
13use crate::task::{TaskId, TaskMgr};
14
15#[derive(Clone)]
17pub enum Status {
18 Idle,
20 Busy(Option<Vec<String>>),
22}
23
24#[derive(Clone)]
26pub struct ThreadPool<S> {
27 threads: usize,
29 status: Status,
31 tree: Arc<Mutex<DepTree<String>>>,
33 tree_cond: Arc<Condvar>,
34 names: Arc<RwLock<HashMap<TaskId, String>>>,
36 tasks: Arc<RwLock<TaskMgr<S>>>,
37 context: Arc<RwLock<CtxMgr<S>>>,
38 progress: Arc<MultiProgress>,
40}
41
42
43impl<S> ThreadPool<S> where S: Clone + Send + Sync + 'static {
44 pub fn new(threads: usize) -> Self {
46 Self {
47 threads,
48 status: Status::Idle,
50 tree: Arc::new(Mutex::new(DepTree::new())),
52 tree_cond: Arc::new(Default::default()),
53 names: Arc::new(RwLock::new(HashMap::new())),
55 tasks: Arc::new(RwLock::new(TaskMgr::new())),
56 context: Arc::new(RwLock::new(CtxMgr::new())),
57 progress: Arc::new(MultiProgress::with_draw_target(
59 ProgressDrawTarget::stderr_with_hz(60),
60 )),
61 }
62 }
63
64 pub fn add_task(&mut self, name: &str, deps: Vec<&str>, task: Task<S>) {
66 let mut guard = self.tree.lock().unwrap();
67 let id = guard.find_or_add_node(name.to_string());
68 self.names.write().unwrap().insert(id, name.to_string());
70 self.tasks.write().unwrap().insert(id, task);
71 for dep in deps {
73 guard.find_or_add_edge(name.to_string(), dep.to_string());
74 }
75 }
76
77 pub fn start_single(&mut self, name: &str) {
79 self.start_multi(vec![name])
80 }
81
82 pub fn start_multi(&mut self, names: Vec<&str>) {
84 self.status = Status::Busy(Some(names.iter().map(|s| s.to_string()).collect()));
85 let mut tree_lock = self.tree.lock().unwrap();
87 tree_lock.retain_dependencies(names.iter().map(|&s| s.into()).collect());
88 drop(tree_lock);
89 self.exec();
91 }
92
93 pub fn start(&mut self) {
95 self.status = Status::Busy(None);
96 self.exec()
97 }
98
99 fn exec(&self) {
100 let pool = threadpool::ThreadPool::new(self.threads);
101 for _ in 0..self.threads {
102 let tree = self.tree.clone();
103 let tree_cond = self.tree_cond.clone();
104 let names = self.names.clone();
105 let tasks = self.tasks.clone();
106 let context = self.context.clone();
107 let line = Line::new(self.progress.clone());
108 pool.execute(move || {
109 while let Some(id) = line.suspend(|| {
111 let mut guard = tree.lock().unwrap();
113 loop {
114 match guard.find_next_node() {
115 Ok(Some(id)) => {
116 guard.update_node(id, DepState::Running);
117 return Some(id);
118 }
119 Ok(None) => { guard = tree_cond.wait(guard).unwrap(); }
120 Err(()) => { return None; }
121 }
122 }
123 }) {
124 let tasks_lock = tasks.read().unwrap();
126 let task = (*tasks_lock.get(id).unwrap()).clone();
127 drop(tasks_lock);
128 let tree_lock = tree.lock().unwrap();
130 let dep_ids = tree_lock.children(id);
131 drop(tree_lock);
132 let names_lock = names.read().unwrap();
134 let name = (*names_lock.get(&id).unwrap()).clone();
135 let dep_names = dep_ids.iter().map(|id| (*names_lock.get(&id).unwrap()).clone()).collect();
136 drop(names_lock);
137 let context_lock = context.read().unwrap();
139 let ctx = context_lock.filter(dep_names);
140 drop(context_lock);
141 line.update_prefix(format!("[{}]", name));
143 line.update_status_and_message(LineStatus::Idle, "waiting...".to_string());
144 match task(line.clone(), name.clone(), ctx) {
145 Ok(s) => {
146 let mut context_lock = context.write().unwrap();
148 context_lock.insert(name, s);
149 let mut guard = tree.lock().unwrap();
151 guard.update_node(id, DepState::Success);
152 tree_cond.notify_all();
153 drop(guard);
154 }
155 Err(e) => {
156 let mut guard = tree.lock().unwrap();
158 guard.update_node(id, DepState::Failure);
159 tree_cond.notify_all();
160 drop(guard);
161 let prefix = style(format!("[{}]", name)).bold();
163 let symbol = style("✘").red();
164 let message = style(e).white();
165 line.println(format!("{} {} {}", prefix, symbol, message));
166 }
167 };
168 }
169 });
170 }
171 pool.join();
173 }
174
175 pub fn render(&self, path: String) -> std::io::Result<()> {
177 let mut file = File::create(path)?;
178 let guard = self.tree.lock().unwrap();
179 let dot = guard.to_string();
180 drop(guard);
181 file.write_all(dot.as_bytes())?;
182 Ok(())
183 }
184}
185