parsli/
pool.rs

1use std::collections::HashMap;
2use std::fs::File;
3use std::io::Write;
4use std::sync::{Arc, Condvar, Mutex, RwLock};
5
6use console::style;
7use indicatif::{MultiProgress, ProgressDrawTarget};
8use threadpool;
9
10use crate::{Line, LineStatus, Task};
11use crate::ctx::CtxMgr;
12use crate::dep::{DepState, DepTree};
13use crate::task::{TaskId, TaskMgr};
14
15/// The status of the pool
16#[derive(Clone)]
17pub enum Status {
18    // The pool is not working.
19    Idle,
20    // The pool is working. A subset of tasks can be specified.
21    Busy(Option<Vec<String>>),
22}
23
24/// A wrapper for showing progress of multiple dependent tasks
25#[derive(Clone)]
26pub struct ThreadPool<S> {
27    /// Number of processes
28    threads: usize,
29    /// Status of this pool
30    status: Status,
31    /// Dependency tree
32    tree: Arc<Mutex<DepTree<String>>>,
33    tree_cond: Arc<Condvar>,
34    /// Managers
35    names: Arc<RwLock<HashMap<TaskId, String>>>,
36    tasks: Arc<RwLock<TaskMgr<S>>>,
37    context: Arc<RwLock<CtxMgr<S>>>,
38    /// Multi progress bar
39    progress: Arc<MultiProgress>,
40}
41
42
43impl<S> ThreadPool<S> where S: Clone + Send + Sync + 'static {
44    /// Creates a new process pool
45    pub fn new(threads: usize) -> Self {
46        Self {
47            threads,
48            /// Do nothing initially
49            status: Status::Idle,
50            /// Empty tree
51            tree: Arc::new(Mutex::new(DepTree::new())),
52            tree_cond: Arc::new(Default::default()),
53            /// Managers
54            names: Arc::new(RwLock::new(HashMap::new())),
55            tasks: Arc::new(RwLock::new(TaskMgr::new())),
56            context: Arc::new(RwLock::new(CtxMgr::new())),
57            /// High refresh rate progress bar
58            progress: Arc::new(MultiProgress::with_draw_target(
59                ProgressDrawTarget::stderr_with_hz(60),
60            )),
61        }
62    }
63
64    /// Add a new task
65    pub fn add_task(&mut self, name: &str, deps: Vec<&str>, task: Task<S>) {
66        let mut guard = self.tree.lock().unwrap();
67        let id = guard.find_or_add_node(name.to_string());
68        // Update mappings
69        self.names.write().unwrap().insert(id, name.to_string());
70        self.tasks.write().unwrap().insert(id, task);
71        // Make blocked if there are dependencies
72        for dep in deps {
73            guard.find_or_add_edge(name.to_string(), dep.to_string());
74        }
75    }
76
77    /// Start the pool for a single task
78    pub fn start_single(&mut self, name: &str) {
79        self.start_multi(vec![name])
80    }
81
82    /// Start the pool for multiple tasks
83    pub fn start_multi(&mut self, names: Vec<&str>) {
84        self.status = Status::Busy(Some(names.iter().map(|s| s.to_string()).collect()));
85        // Remove all dependencies that are not required
86        let mut tree_lock = self.tree.lock().unwrap();
87        tree_lock.retain_dependencies(names.iter().map(|&s| s.into()).collect());
88        drop(tree_lock);
89        // Start filtered tasks
90        self.exec();
91    }
92
93    /// Start all tasks
94    pub fn start(&mut self) {
95        self.status = Status::Busy(None);
96        self.exec()
97    }
98
99    fn exec(&self) {
100        let pool = threadpool::ThreadPool::new(self.threads);
101        for _ in 0..self.threads {
102            let tree = self.tree.clone();
103            let tree_cond = self.tree_cond.clone();
104            let names = self.names.clone();
105            let tasks = self.tasks.clone();
106            let context = self.context.clone();
107            let line = Line::new(self.progress.clone());
108            pool.execute(move || {
109                // Hide status line while waiting for the next task
110                while let Some(id) = line.suspend(|| {
111                    // Get the next task to execute from the tree
112                    let mut guard = tree.lock().unwrap();
113                    loop {
114                        match guard.find_next_node() {
115                            Ok(Some(id)) => {
116                                guard.update_node(id, DepState::Running);
117                                return Some(id);
118                            }
119                            Ok(None) => { guard = tree_cond.wait(guard).unwrap(); }
120                            Err(()) => { return None; }
121                        }
122                    }
123                }) {
124                    // Find the task
125                    let tasks_lock = tasks.read().unwrap();
126                    let task = (*tasks_lock.get(id).unwrap()).clone();
127                    drop(tasks_lock);
128                    // Find the task dependencies
129                    let tree_lock = tree.lock().unwrap();
130                    let dep_ids = tree_lock.children(id);
131                    drop(tree_lock);
132                    // Find the name of the task and the task dependencies
133                    let names_lock = names.read().unwrap();
134                    let name = (*names_lock.get(&id).unwrap()).clone();
135                    let dep_names = dep_ids.iter().map(|id| (*names_lock.get(&id).unwrap()).clone()).collect();
136                    drop(names_lock);
137                    // Filter context
138                    let context_lock = context.read().unwrap();
139                    let ctx = context_lock.filter(dep_names);
140                    drop(context_lock);
141                    // Execute the task
142                    line.update_prefix(format!("[{}]", name));
143                    line.update_status_and_message(LineStatus::Idle, "waiting...".to_string());
144                    match task(line.clone(), name.clone(), ctx) {
145                        Ok(s) => {
146                            // Insert the result into the context
147                            let mut context_lock = context.write().unwrap();
148                            context_lock.insert(name, s);
149                            // Mark task as a success
150                            let mut guard = tree.lock().unwrap();
151                            guard.update_node(id, DepState::Success);
152                            tree_cond.notify_all();
153                            drop(guard);
154                        }
155                        Err(e) => {
156                            // Mark task as a failure
157                            let mut guard = tree.lock().unwrap();
158                            guard.update_node(id, DepState::Failure);
159                            tree_cond.notify_all();
160                            drop(guard);
161                            // Something went wrong, but we can still do other jobs
162                            let prefix = style(format!("[{}]", name)).bold();
163                            let symbol = style("✘").red();
164                            let message = style(e).white();
165                            line.println(format!("{} {} {}", prefix, symbol, message));
166                        }
167                    };
168                }
169            });
170        }
171        // Wait for tasks to finish
172        pool.join();
173    }
174
175    // Write tree to file in dot format
176    pub fn render(&self, path: String) -> std::io::Result<()> {
177        let mut file = File::create(path)?;
178        let guard = self.tree.lock().unwrap();
179        let dot = guard.to_string();
180        drop(guard);
181        file.write_all(dot.as_bytes())?;
182        Ok(())
183    }
184}
185