thepipelinetool 0.2.7

readers used for thepipelinetool
Documentation
use std::{sync::mpsc::channel, thread};

use thepipelinetool_runner::{
    backend::Backend, blanket_backend::BlanketBackend, in_memory_backend::InMemoryBackend,
};

pub fn run_in_memory(backend: &mut InMemoryBackend, max_parallelism: usize, tpt_path: String) {
    let (tx, rx) = channel();
    let mut current_parallel_tasks_count = 0;

    for _ in 0..max_parallelism {
        if let Some(temp_queued_task) = backend.pop_priority_queue().unwrap() {
            let tx = tx.clone();
            let mut backend = backend.clone();
            let tpt_path = tpt_path.clone();

            thread::spawn(move || {
                backend.work(&temp_queued_task, tpt_path).unwrap();
                backend.remove_from_temp_queue(&temp_queued_task).unwrap();
                tx.send(()).unwrap();
            });

            current_parallel_tasks_count += 1;
            if current_parallel_tasks_count >= max_parallelism {
                break;
            }
        } else {
            break;
        }
    }

    if current_parallel_tasks_count == 0 {
        drop(tx);
        return;
    }

    for _ in rx.iter() {
        current_parallel_tasks_count -= 1;

        if let Some(temp_queued_task) = backend.pop_priority_queue().unwrap() {
            let tx = tx.clone();
            let mut backend = backend.clone();
            let tpt_path = tpt_path.clone();

            thread::spawn(move || {
                backend.work(&temp_queued_task, tpt_path).unwrap();
                backend.remove_from_temp_queue(&temp_queued_task).unwrap();
                tx.send(()).unwrap();
            });
            current_parallel_tasks_count += 1;

            if current_parallel_tasks_count >= max_parallelism {
                continue;
            }
        }
        if current_parallel_tasks_count == 0 {
            drop(tx);
            break;
        }
    }
}