use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, Condvar, Mutex, RwLock};
use console::style;
use indicatif::{MultiProgress, ProgressDrawTarget};
use threadpool;
use crate::{Line, LineStatus, Task};
use crate::ctx::CtxMgr;
use crate::dep::{DepState, DepTree};
use crate::task::{TaskId, TaskMgr};
#[derive(Clone)]
pub enum Status {
Idle,
Busy(Option<Vec<String>>),
}
#[derive(Clone)]
pub struct ThreadPool<S> {
threads: usize,
status: Status,
tree: Arc<Mutex<DepTree<String>>>,
tree_cond: Arc<Condvar>,
names: Arc<RwLock<HashMap<TaskId, String>>>,
tasks: Arc<RwLock<TaskMgr<S>>>,
context: Arc<RwLock<CtxMgr<S>>>,
progress: Arc<MultiProgress>,
}
impl<S> ThreadPool<S> where S: Clone + Send + Sync + 'static {
pub fn new(threads: usize) -> Self {
Self {
threads,
status: Status::Idle,
tree: Arc::new(Mutex::new(DepTree::new())),
tree_cond: Arc::new(Default::default()),
names: Arc::new(RwLock::new(HashMap::new())),
tasks: Arc::new(RwLock::new(TaskMgr::new())),
context: Arc::new(RwLock::new(CtxMgr::new())),
progress: Arc::new(MultiProgress::with_draw_target(
ProgressDrawTarget::stderr_with_hz(60),
)),
}
}
pub fn add_task(&mut self, name: &str, deps: Vec<&str>, task: Task<S>) {
let mut guard = self.tree.lock().unwrap();
let id = guard.find_or_add_node(name.to_string());
self.names.write().unwrap().insert(id, name.to_string());
self.tasks.write().unwrap().insert(id, task);
for dep in deps {
guard.find_or_add_edge(name.to_string(), dep.to_string());
}
}
pub fn start_single(&mut self, name: &str) {
self.start_multi(vec![name])
}
pub fn start_multi(&mut self, names: Vec<&str>) {
self.status = Status::Busy(Some(names.iter().map(|s| s.to_string()).collect()));
let mut tree_lock = self.tree.lock().unwrap();
tree_lock.retain_dependencies(names.iter().map(|&s| s.into()).collect());
drop(tree_lock);
self.exec();
}
pub fn start(&mut self) {
self.status = Status::Busy(None);
self.exec()
}
fn exec(&self) {
let pool = threadpool::ThreadPool::new(self.threads);
for _ in 0..self.threads {
let tree = self.tree.clone();
let tree_cond = self.tree_cond.clone();
let names = self.names.clone();
let tasks = self.tasks.clone();
let context = self.context.clone();
let line = Line::new(self.progress.clone());
pool.execute(move || {
while let Some(id) = line.suspend(|| {
let mut guard = tree.lock().unwrap();
loop {
match guard.find_next_node() {
Ok(Some(id)) => {
guard.update_node(id, DepState::Running);
return Some(id);
}
Ok(None) => { guard = tree_cond.wait(guard).unwrap(); }
Err(()) => { return None; }
}
}
}) {
let tasks_lock = tasks.read().unwrap();
let task = (*tasks_lock.get(id).unwrap()).clone();
drop(tasks_lock);
let tree_lock = tree.lock().unwrap();
let dep_ids = tree_lock.children(id);
drop(tree_lock);
let names_lock = names.read().unwrap();
let name = (*names_lock.get(&id).unwrap()).clone();
let dep_names = dep_ids.iter().map(|id| (*names_lock.get(&id).unwrap()).clone()).collect();
drop(names_lock);
let context_lock = context.read().unwrap();
let ctx = context_lock.filter(dep_names);
drop(context_lock);
line.update_prefix(format!("[{}]", name));
line.update_status_and_message(LineStatus::Idle, "waiting...".to_string());
match task(line.clone(), name.clone(), ctx) {
Ok(s) => {
let mut context_lock = context.write().unwrap();
context_lock.insert(name, s);
let mut guard = tree.lock().unwrap();
guard.update_node(id, DepState::Success);
tree_cond.notify_all();
drop(guard);
}
Err(e) => {
let mut guard = tree.lock().unwrap();
guard.update_node(id, DepState::Failure);
tree_cond.notify_all();
drop(guard);
let prefix = style(format!("[{}]", name)).bold();
let symbol = style("✘").red();
let message = style(e).white();
line.println(format!("{} {} {}", prefix, symbol, message));
}
};
}
});
}
pool.join();
}
pub fn render(&self, path: String) -> std::io::Result<()> {
let mut file = File::create(path)?;
let guard = self.tree.lock().unwrap();
let dot = guard.to_string();
drop(guard);
file.write_all(dot.as_bytes())?;
Ok(())
}
}