thread-manager 1.0.0

A streamlined Rust library for efficient thread pooling and parallel job execution, designed for simplicity, flexibility, and performance.
Documentation
use std::cell::Cell;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;

use crossbeam_channel::RecvError;

use crate::channel::JobChannel;
use crate::channel::MessageKind;
use crate::channel::ResultChannel;

use crate::status::ManagerStatus;
use crate::status::WorkerStatus;

use crate::order::LOAD_ORDER;
use crate::order::STORE_ORDER;

pub struct ThreadWorker<F, T>
where
    F: Fn() -> T + Send + 'static,
{
    thread: Cell<Option<thread::JoinHandle<()>>>,
    worker: Arc<Worker<F, T>>,
}

impl<F, T> ThreadWorker<F, T>
where
    F: Fn() -> T + Send + 'static,
    T: Send + 'static,
{
    pub fn new(
        id: usize,
        job_channel: Arc<JobChannel<F>>,
        result_channel: Arc<ResultChannel<T>>,
        manager_status: Arc<ManagerStatus>,
    ) -> Self {
        let thread: Cell<Option<thread::JoinHandle<()>>> = Cell::new(None);
        let worker: Arc<Worker<F, T>> =
            Worker::new(id, job_channel, result_channel, manager_status);

        Self { thread, worker }
    }

    pub fn start(&self) {
        if !self.worker.status().is_active() {
            self.worker.set_active(true);
            self.worker.set_termination(false);
            self.create_thread();
        }
    }

    pub fn send(&self, job: F) {
        self.worker.send_job(job);
        self.start();
    }
}

impl<F, T> ThreadWorker<F, T>
where
    F: Fn() -> T + Send + 'static,
    T: Send + 'static,
{
    pub fn id(&self) -> usize {
        self.worker.id()
    }

    pub fn status(&self) -> &WorkerStatus {
        self.worker.status()
    }

    pub fn job_channel(&self) -> &Arc<JobChannel<F>> {
        self.worker.job_channel()
    }

    pub fn join(&self) {
        if let Some(thread) = self.thread.take() {
            let _ = thread.join();
        }
    }

    pub fn set_termination(&self, state: bool) {
        self.worker.set_termination(state)
    }

    pub fn send_release(&self) {
        self.worker.send_release()
    }
}

impl<F, T> ThreadWorker<F, T>
where
    F: Fn() -> T + Send + 'static,
    T: Send + 'static,
{
    fn create_thread(&self) {
        let worker: Arc<Worker<F, T>> = self.worker.clone();
        let thread: thread::JoinHandle<()> = thread::spawn(worker.create());
        self.thread.set(Some(thread));
    }
}

pub struct Worker<F, T>
where
    F: Fn() -> T + Send + 'static,
{
    id: usize,
    termination: AtomicBool,
    job_channel: Arc<JobChannel<F>>,
    result_channel: Arc<ResultChannel<T>>,
    manager_status: Arc<ManagerStatus>,
    worker_status: WorkerStatus,
}

impl<F, T> Worker<F, T>
where
    F: Fn() -> T + Send + 'static,
    T: Send + 'static,
{
    pub fn new(
        id: usize,
        job_channel: Arc<JobChannel<F>>,
        result_channel: Arc<ResultChannel<T>>,
        manager_status: Arc<ManagerStatus>,
    ) -> Arc<Worker<F, T>> {
        let termination: AtomicBool = AtomicBool::new(false);
        let worker_status: WorkerStatus = WorkerStatus::new();

        Arc::new(Self {
            id,
            termination,
            job_channel,
            result_channel,
            manager_status,
            worker_status,
        })
    }

    pub fn id(self: &Arc<Self>) -> usize {
        self.id
    }

    pub fn send_job(self: &Arc<Self>, job: F) {
        self.job_channel
            .send(job)
            .expect(&format!("Failed to send job to Worker [{}]", self.id()));
    }

    pub fn create(self: Arc<Self>) -> impl Fn() {
        let worker = move || {
            self.worker();
            self.set_active(false);
            self.set_termination(false);
        };
        worker
    }

    pub fn status<'a>(self: &'a Arc<Self>) -> &'a WorkerStatus {
        &self.worker_status
    }

    pub fn job_channel<'a>(self: &'a Arc<Self>) -> &'a Arc<JobChannel<F>> {
        &self.job_channel
    }

    pub fn set_active(self: &Arc<Self>, state: bool) {
        self.worker_status.set_active(state);
        self.manager_status.set_active(state);
    }

    pub fn set_waiting(self: &Arc<Self>, state: bool) {
        self.worker_status.set_waiting(state);
        self.manager_status.set_waiting(state);
    }

    pub fn set_busy(self: &Arc<Self>, state: bool) {
        self.worker_status.set_busy(state);
        self.manager_status.set_busy(state);
    }

    pub fn set_termination(self: &Arc<Self>, state: bool) {
        self.termination.store(state, STORE_ORDER);
    }

    pub fn send_release(self: &Arc<Self>) {
        self.job_channel
            .send_release()
            .expect(&format!("Failed to release Worker [{}]", self.id()));
    }
}

impl<F, T> Worker<F, T>
where
    F: Fn() -> T + Send + 'static,
    T: Send + 'static,
{
    fn worker(self: &Arc<Self>) {
        while !self.termination.load(LOAD_ORDER) {
            let recv: Result<MessageKind<F>, RecvError> = self.wait_for_recv();

            if let Ok(message) = recv {
                match message {
                    MessageKind::Job(job) => self.job_recv(job),
                    MessageKind::Release => break,
                }
            }
        }
    }

    fn wait_for_recv(self: &Arc<Self>) -> Result<MessageKind<F>, RecvError> {
        self.set_waiting(true);
        let recv: Result<MessageKind<F>, RecvError> = self.job_channel.recv();
        self.set_waiting(false);
        recv
    }

    fn job_recv(self: &Arc<Self>, job: F) {
        self.worker_status.add_received();
        self.set_busy(true);

        self.result_channel
            .send(job())
            .expect("Failed to send result");

        self.job_channel.status().set_concluded(true);
        self.set_busy(false);
    }
}