parsli 0.1.1

Parallel status lines for Rust
Documentation
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::sync::{Arc, Condvar, Mutex, RwLock};

use console::style;
use indicatif::{MultiProgress, ProgressDrawTarget};
use threadpool;

use crate::{Line, LineStatus, Task};
use crate::ctx::CtxMgr;
use crate::dep::{DepState, DepTree};
use crate::task::{TaskId, TaskMgr};

/// The status of the pool
#[derive(Clone)]
pub enum Status {
    // The pool is not working.
    Idle,
    // The pool is working. A subset of tasks can be specified.
    Busy(Option<Vec<String>>),
}

/// A wrapper for showing progress of multiple dependent tasks
#[derive(Clone)]
pub struct ThreadPool<S> {
    /// Number of processes
    threads: usize,
    /// Status of this pool
    status: Status,
    /// Dependency tree
    tree: Arc<Mutex<DepTree<String>>>,
    tree_cond: Arc<Condvar>,
    /// Managers
    names: Arc<RwLock<HashMap<TaskId, String>>>,
    tasks: Arc<RwLock<TaskMgr<S>>>,
    context: Arc<RwLock<CtxMgr<S>>>,
    /// Multi progress bar
    progress: Arc<MultiProgress>,
}


impl<S> ThreadPool<S> where S: Clone + Send + Sync + 'static {
    /// Creates a new process pool
    pub fn new(threads: usize) -> Self {
        Self {
            threads,
            /// Do nothing initially
            status: Status::Idle,
            /// Empty tree
            tree: Arc::new(Mutex::new(DepTree::new())),
            tree_cond: Arc::new(Default::default()),
            /// Managers
            names: Arc::new(RwLock::new(HashMap::new())),
            tasks: Arc::new(RwLock::new(TaskMgr::new())),
            context: Arc::new(RwLock::new(CtxMgr::new())),
            /// High refresh rate progress bar
            progress: Arc::new(MultiProgress::with_draw_target(
                ProgressDrawTarget::stderr_with_hz(60),
            )),
        }
    }

    /// Add a new task
    pub fn add_task(&mut self, name: &str, deps: Vec<&str>, task: Task<S>) {
        let mut guard = self.tree.lock().unwrap();
        let id = guard.find_or_add_node(name.to_string());
        // Update mappings
        self.names.write().unwrap().insert(id, name.to_string());
        self.tasks.write().unwrap().insert(id, task);
        // Make blocked if there are dependencies
        for dep in deps {
            guard.find_or_add_edge(name.to_string(), dep.to_string());
        }
    }

    /// Start the pool for a single task
    pub fn start_single(&mut self, name: &str) {
        self.start_multi(vec![name])
    }

    /// Start the pool for multiple tasks
    pub fn start_multi(&mut self, names: Vec<&str>) {
        self.status = Status::Busy(Some(names.iter().map(|s| s.to_string()).collect()));
        // Remove all dependencies that are not required
        let mut tree_lock = self.tree.lock().unwrap();
        tree_lock.retain_dependencies(names.iter().map(|&s| s.into()).collect());
        drop(tree_lock);
        // Start filtered tasks
        self.exec();
    }

    /// Start all tasks
    pub fn start(&mut self) {
        self.status = Status::Busy(None);
        self.exec()
    }

    fn exec(&self) {
        let pool = threadpool::ThreadPool::new(self.threads);
        for _ in 0..self.threads {
            let tree = self.tree.clone();
            let tree_cond = self.tree_cond.clone();
            let names = self.names.clone();
            let tasks = self.tasks.clone();
            let context = self.context.clone();
            let line = Line::new(self.progress.clone());
            pool.execute(move || {
                // Hide status line while waiting for the next task
                while let Some(id) = line.suspend(|| {
                    // Get the next task to execute from the tree
                    let mut guard = tree.lock().unwrap();
                    loop {
                        match guard.find_next_node() {
                            Ok(Some(id)) => {
                                guard.update_node(id, DepState::Running);
                                return Some(id);
                            }
                            Ok(None) => { guard = tree_cond.wait(guard).unwrap(); }
                            Err(()) => { return None; }
                        }
                    }
                }) {
                    // Find the task
                    let tasks_lock = tasks.read().unwrap();
                    let task = (*tasks_lock.get(id).unwrap()).clone();
                    drop(tasks_lock);
                    // Find the task dependencies
                    let tree_lock = tree.lock().unwrap();
                    let dep_ids = tree_lock.children(id);
                    drop(tree_lock);
                    // Find the name of the task and the task dependencies
                    let names_lock = names.read().unwrap();
                    let name = (*names_lock.get(&id).unwrap()).clone();
                    let dep_names = dep_ids.iter().map(|id| (*names_lock.get(&id).unwrap()).clone()).collect();
                    drop(names_lock);
                    // Filter context
                    let context_lock = context.read().unwrap();
                    let ctx = context_lock.filter(dep_names);
                    drop(context_lock);
                    // Execute the task
                    line.update_prefix(format!("[{}]", name));
                    line.update_status_and_message(LineStatus::Idle, "waiting...".to_string());
                    match task(line.clone(), name.clone(), ctx) {
                        Ok(s) => {
                            // Insert the result into the context
                            let mut context_lock = context.write().unwrap();
                            context_lock.insert(name, s);
                            // Mark task as a success
                            let mut guard = tree.lock().unwrap();
                            guard.update_node(id, DepState::Success);
                            tree_cond.notify_all();
                            drop(guard);
                        }
                        Err(e) => {
                            // Mark task as a failure
                            let mut guard = tree.lock().unwrap();
                            guard.update_node(id, DepState::Failure);
                            tree_cond.notify_all();
                            drop(guard);
                            // Something went wrong, but we can still do other jobs
                            let prefix = style(format!("[{}]", name)).bold();
                            let symbol = style("").red();
                            let message = style(e).white();
                            line.println(format!("{} {} {}", prefix, symbol, message));
                        }
                    };
                }
            });
        }
        // Wait for tasks to finish
        pool.join();
    }

    // Write tree to file in dot format
    pub fn render(&self, path: String) -> std::io::Result<()> {
        let mut file = File::create(path)?;
        let guard = self.tree.lock().unwrap();
        let dot = guard.to_string();
        drop(guard);
        file.write_all(dot.as_bytes())?;
        Ok(())
    }
}