blackhole 0.20.2

...to throw your threads into.
Documentation
/*
==--==--==--==--==--==--==--==--==--==--==--==--==--==--==--==--

Black Hole

Copyright (C) 2019-2020, 2022-2023  Anonymous

There are several releases over multiple years,
they are listed as ranges, such as: "2019-2020".

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.

::--::--::--::--::--::--::--::--::--::--::--::--::--::--::--::--
*/

//! # Root

use {
    std::{
        io::{Error, ErrorKind},
        sync::{
            Arc,
            Mutex,
            mpsc::{self, Receiver, RecvTimeoutError, SendError, Sender, SyncSender, TrySendError},
        },
        thread::{self, JoinHandle},
        time::Duration,
    },
    crate::{Job, Result},
};

/// # Active limit
pub type ActiveLimit = u32;

type JobManagerHandle = JoinHandle<()>;
type JobRunnerHandle = JoinHandle<()>;
type JobHandle = Arc<Mutex<()>>;

/// # Message
#[derive(Debug)]
enum Msg<J> where J: Job {

    /// # New job
    NewJob {

        /// # Job
        job: J,

        /// # Job sender to be sent back by job manager
        ///
        /// - If new job is accepted, job manager sends back `None`.
        /// - If new job is not accepted, job manager sends back the original job.
        sender: Sender<Option<J>>,

    },

    /// # For client to ask to get job
    GetJob {

        /// # Sender for job manager to send back result
        sender: Sender<Option<J>>,

    },

    /// # To notify job manager that a job has been finished
    JobFinished {

        /// # Job handle
        handle: JobHandle,

    },

    /// # Stop job manager
    StopJobManager,

    /// # Stop job manager on idle
    StopJobManagerOnIdle,

}

/// # Black Hole
///
/// ## Usage
///
/// You _should_ call [`escape_on_idle()`][::escape_on_idle()] when done with this.
///
/// Be careful when you manually call `drop()` on a black hole, or simply let it go out of scope. Because _background threads are
/// left unmanaged_.
/// They will keep running until main thread exits. Then they will be cancelled/discarded by the system.
///
/// ## Notes
///
/// Messages printed by background threads are prefixed with [`TAG`][crate::TAG].
///
/// [::escape_on_idle()]: struct.BlackHole.html#method.escape_on_idle
/// [crate::TAG]: constant.TAG.html
#[derive(Debug)]
pub struct BlackHole<J> where J: Job {

    /// # Job manager handle
    job_manager: JobManagerHandle,

    /// # Job sender
    job_sender: SyncSender<Msg<J>>,

}

impl<J> BlackHole<J> where J: Job {

    /// # Makes new instance with default active limit
    ///
    /// ## Notes
    ///
    /// - Default active limit is the value of [`available_parallelism()`][fn:available_parallelism].
    /// - See [`make_with_active_limit()`][fn:#make_with_active_limit] for more details.
    ///
    /// [fn:available_parallelism]: fn.available_parallelism.html
    /// [fn:#make_with_active_limit]: #method.make_with_active_limit
    pub fn make(queue_limit: usize) -> Result<Self> {
        Self::make_with_active_limit(available_parallelism()?, queue_limit)
    }

    /// # Makes new instance
    ///
    /// ## Notes
    ///
    /// - Active limit and queue limit are declared in different types to help prevent typos.
    /// - An error is returned if either active limit or queue limit is zero.
    /// - Internal vectors used for jobs will be made immediately with capacities of active limit and queue limit.
    pub fn make_with_active_limit(active_limit: ActiveLimit, queue_limit: usize) -> Result<Self> {
        if active_limit == 0 {
            return Err(Error::new(ErrorKind::InvalidData, "Active limit must be larger than zero"));
        }

        if queue_limit == 0 {
            return Err(Error::new(ErrorKind::InvalidData, "Queue limit must be larger than zero"));
        }
        if cfg!(not(any(target_pointer_width="8", target_pointer_width="16"))) {
            if queue_limit >= 2_usize.pow(32) {
                crate::io::lock_write_err(__!("Warning: maybe queue limit is too large? -> {}\n", queue_limit));
            }
        }

        let (sender, receiver) = mpsc::sync_channel::<Msg<J>>(queue_limit);
        Ok(Self {
            job_manager: spawn_job_manager(receiver, sender.clone(), active_limit, queue_limit)?,
            job_sender: sender,
        })
    }

    /// # Throws new job into the black hole
    ///
    /// - If the job is accepted, `None` is returned.
    /// - If the job is not accepted, it is returned to you for recovery.
    pub fn throw(&self, job: J) -> Result<Option<J>> {
        let (sender, receiver) = mpsc::channel();
        match self.job_sender.try_send(Msg::NewJob { job, sender }) {
            Ok(()) => receiver.recv().map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed waiting for job manager: {}", e))),
            Err(TrySendError::Full(Msg::NewJob { job, .. })) => Ok(Some(job)),
            Err(TrySendError::Full(_)) => Err(Error::new(ErrorKind::Other, __!("Internal error"))),
            Err(TrySendError::Disconnected(_)) => Err(Error::new(ErrorKind::BrokenPipe, "Job manager is disconnected")),
        }
    }

    /// # Escapes the black hole
    ///
    /// ## Notes
    ///
    /// - New jobs and all waiting jobs are discarded.
    /// - Active jobs are left untouched. They _will_ be discarded by system when the program's main thread exits.
    pub unsafe fn escape(self) -> Result<()> {
        self.job_sender.send(Msg::StopJobManager)
            .map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-message to job manager: {}", e)))
    }

    /// # Escapes the black hole on idle and waits for it
    pub fn escape_on_idle(self) -> Result<()> {
        match self.job_sender.send(Msg::StopJobManagerOnIdle) {
            Ok(()) => {
                drop(self.job_sender);
                self.job_manager.join().map_err(|err|
                    Error::new(ErrorKind::Other, format!("Failed to wait for job manager to finished: {:?}", err))
                )
            },
            Err(err) => Err(Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-on-idle message to job manager: {}", err))),
        }
    }

}

/// # Spawns job manager
fn spawn_job_manager<J>(
    job_manager_receiver: Receiver<Msg<J>>, job_manager_sender: SyncSender<Msg<J>>, active_limit: ActiveLimit, queue_limit: usize,
) -> Result<JobManagerHandle> where J: Job {
    let active_limit = usize::try_from(active_limit)
        .map_err(|_| Error::new(ErrorKind::Other, format!("Failed to convert active limit to usize: {}", active_limit)))?;
    Ok(thread::spawn(move || {
        let mut active_jobs = Vec::with_capacity(active_limit);
        let mut job_queue = Vec::with_capacity(queue_limit);
        let mut stop_on_idle = false;

        let update_active_jobs = |jobs: &mut Vec<JobHandle>| jobs.retain(|j| if j.is_poisoned() {
            crate::io::lock_write_err(__!("Found 1 panicked job\n"));
            false
        } else {
            Arc::weak_count(j) > 1 || Arc::strong_count(j) > 1
        });
        let spawn_job = |job, jobs: &mut Vec<_>| {
            let handle = Arc::new(Mutex::new(()));
            jobs.push(handle.clone());
            spawn_job_runner(job_manager_sender.clone(), job, handle);
        };

        loop {
            match job_manager_receiver.recv_timeout(Duration::from_millis(50)) {
                Ok(Msg::NewJob { job, sender }) => {
                    let job = if active_jobs.len() < active_limit {
                        spawn_job(job, &mut active_jobs);
                        None
                    } else if job_queue.len() < queue_limit {
                        job_queue.push(job);
                        None
                    } else {
                        Some(job)
                    };
                    if let Err(err) = sender.send(job) {
                        crate::io::lock_write_err(__!("Failed sending back job-message to BlackHole instance: {}\n", err));
                    }
                },
                Ok(Msg::GetJob { sender }) => if let Err(SendError(job)) = sender.send(job_queue.pop()) {
                    if let Some(job) = job {
                        job_queue.push(job);
                    }
                },
                Ok(Msg::JobFinished { handle }) => {
                    drop(handle);
                    update_active_jobs(&mut active_jobs);
                    if let Some(job) = job_queue.pop() {
                        spawn_job(job, &mut active_jobs);
                    }
                    if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Ok(Msg::StopJobManager) => {
                    crate::io::lock_write_err(__!(
                        "Warning: escaping BlackHole while there are {} active job(s) and unknown jobs in queue\n", active_jobs.len(),
                    ));
                    break;
                },
                Ok(Msg::StopJobManagerOnIdle) => {
                    stop_on_idle = true;
                    if active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Err(RecvTimeoutError::Timeout) => {
                    update_active_jobs(&mut active_jobs);
                    if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Err(RecvTimeoutError::Disconnected) => break,
            };
        }
    }))
}

/// # Spawns job runner
fn spawn_job_runner<J>(job_manager_sender: SyncSender<Msg<J>>, mut job: J, handle: JobHandle) -> JobRunnerHandle where J: Job {
    thread::spawn(move || {
        let (sender, receiver) = mpsc::channel();
        loop {
            crate::run_to_end(job);

            if let Err(err) = job_manager_sender.send(Msg::GetJob { sender: sender.clone() }) {
                crate::io::lock_write_err(__!("Failed to send message to job manager: {:?}\n", err));
                break;
            }
            match receiver.recv() {
                Ok(Some(new_job)) => job = new_job,
                Ok(None) => break,
                Err(err) => {
                    crate::io::lock_write_err(__!("Failed to receive message from job manager: {:?}\n", err));
                    break;
                },
            };
        }
        if let Err(err) = job_manager_sender.send(Msg::JobFinished { handle }) {
            crate::io::lock_write_err(__!("Failed to send job-finished message to job manager: {}\n", err));
        }
    })
}

/// # Wrapper for `std:thread::available_parallelism()`
///
/// This function returns value of type [`ActiveLimit`][type:ActiveLimit]. It helps working with
/// [`BlackHole::make()`][fn:BlackHole/make].
///
/// See [`available_parallelism()`][fn:std/thread/available_parallelism] for more details.
///
/// [type:ActiveLimit]: type.ActiveLimit.html
/// [fn:BlackHole/make]: struct.BlackHole.html#method.make
/// [fn:std/thread/available_parallelism]: https://doc.rust-lang.org/std/thread/fn.available_parallelism.html
pub fn available_parallelism() -> Result<ActiveLimit> {
    ActiveLimit::try_from(thread::available_parallelism()?.get()).map_err(|e| Error::new(ErrorKind::Other, __!("{}", e)))
}