1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// License: see LICENSE file at root directory of `master` branch

//! # Root

use std::{
    convert::TryFrom,
    io::{Error, ErrorKind},
    sync::{
        Arc, Mutex,
        mpsc::{self, Receiver, RecvTimeoutError, Sender, SendError, SyncSender, TrySendError},
    },
    thread::{self, JoinHandle},
    time::Duration,
};

use crate::{Job, Result};

/// # Active limit
pub type ActiveLimit = u32;

type JobManagerHandle = JoinHandle<()>;
type JobRunnerHandle = JoinHandle<()>;
type JobHandle = Arc<Mutex<()>>;

/// # Message
#[derive(Debug)]
enum Msg<J> where J: Job {

    /// # New job
    NewJob {

        /// # Job
        job: J,

        /// # Job sender to be sent back by job manager
        ///
        /// - If new job is accepted, job manager sends back `None`.
        /// - If new job is not accepted, job manager sends back the original job.
        sender: Sender<Option<J>>,

    },

    /// # For client to ask to get job
    GetJob {

        /// # Sender for job manager to send back result
        sender: Sender<Option<J>>,

    },

    /// # To notify job manager that a job has been finished
    JobFinished {

        /// # Job handle
        handle: JobHandle,

    },

    /// # Stop job manager
    StopJobManager,

    /// # Stop job manager on idle
    StopJobManagerOnIdle,

}

/// # Black Hole
///
/// ## Usage
///
/// You _should_ call [`escape_on_idle()`][::escape_on_idle()] when done with this.
///
/// Be careful when you manually call `drop()` on a black hole, or simply let it go out of scope. Because _background threads are
/// left unmanaged_.
/// They will keep running until main thread exits. Then they will be cancelled/discarded by the system.
///
/// ## Notes
///
/// Messages printed by background threads are prefixed with [`TAG`][crate::TAG].
///
/// [::escape_on_idle()]: struct.BlackHole.html#method.escape_on_idle
/// [crate::TAG]: constant.TAG.html
#[derive(Debug)]
pub struct BlackHole<J> where J: Job {

    /// # Job manager handle
    job_manager: JobManagerHandle,

    /// # Job sender
    job_sender: SyncSender<Msg<J>>,

}

impl<J> BlackHole<J> where J: Job {

    /// # Makes new instance
    ///
    /// ## Notes
    ///
    /// - Active limit and queue limit are declared in different types to help prevent typos.
    /// - An error is returned if either active limit or queue limit is zero.
    /// - Internal vectors used for jobs will be made immediately with capacities of active limit and queue limit.
    pub fn make(active_limit: ActiveLimit, queue_limit: usize) -> Result<Self> {
        if active_limit == 0 {
            return Err(Error::new(ErrorKind::InvalidData, "Active limit must be larger than zero"));
        }

        if queue_limit == 0 {
            return Err(Error::new(ErrorKind::InvalidData, "Queue limit must be larger than zero"));
        }
        if cfg!(not(any(target_pointer_width="8", target_pointer_width="16"))) {
            if queue_limit >= 2_usize.pow(32) {
                __e!("Warning: maybe queue limit is too large? -> {}", queue_limit);
            }
        }

        let (sender, receiver) = mpsc::sync_channel::<Msg<J>>(queue_limit);
        Ok(Self {
            job_manager: spawn_job_manager(receiver, sender.clone(), active_limit, queue_limit)?,
            job_sender: sender,
        })
    }

    /// # Throws new job into the black hole
    ///
    /// - If the job is accepted, `None` is returned.
    /// - If the job is not accepted, it is returned to you for recovery.
    pub fn throw(&self, job: J) -> Result<Option<J>> {
        let (sender, receiver) = mpsc::channel();
        match self.job_sender.try_send(Msg::NewJob { job, sender }) {
            Ok(()) => receiver.recv().map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed waiting for job manager: {}", e))),
            Err(TrySendError::Full(Msg::NewJob { job, .. })) => Ok(Some(job)),
            Err(TrySendError::Full(_)) => Err(Error::new(ErrorKind::Other, __!("Internal error"))),
            Err(TrySendError::Disconnected(_)) => Err(Error::new(ErrorKind::BrokenPipe, "Job manager is disconnected")),
        }
    }

    /// # Escapes the black hole
    ///
    /// ## Notes
    ///
    /// - New jobs and all waiting jobs are discarded.
    /// - Active jobs are left untouched. They _will_ be discarded by system when the program's main thread exits.
    pub unsafe fn escape(self) -> Result<()> {
        self.job_sender.send(Msg::StopJobManager)
            .map_err(|e| Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-message to job manager: {}", e)))
    }

    /// # Escapes the black hole on idle and waits for it
    pub fn escape_on_idle(self) -> Result<()> {
        match self.job_sender.send(Msg::StopJobManagerOnIdle) {
            Ok(()) => {
                drop(self.job_sender);
                self.job_manager.join().map_err(|err|
                    Error::new(ErrorKind::Other, format!("Failed to wait for job manager to finished: {:?}", err))
                )
            },
            Err(err) => Err(Error::new(ErrorKind::BrokenPipe, format!("Failed sending stop-on-idle message to job manager: {}", err))),
        }
    }

}

/// # Spawns job manager
fn spawn_job_manager<J>(
    job_manager_receiver: Receiver<Msg<J>>, job_manager_sender: SyncSender<Msg<J>>, active_limit: ActiveLimit, queue_limit: usize,
) -> Result<JobManagerHandle> where J: Job {
    let active_limit = usize::try_from(active_limit)
        .map_err(|_| Error::new(ErrorKind::Other, format!("Failed to convert active limit to usize: {}", active_limit)))?;
    Ok(thread::spawn(move || {
        let mut active_jobs = Vec::with_capacity(active_limit);
        let mut job_queue = Vec::with_capacity(queue_limit);
        let mut stop_on_idle = false;

        let update_active_jobs = |jobs: &mut Vec<JobHandle>| jobs.retain(|j| match j.is_poisoned() {
            true => {
                __e!("Found 1 panicked job");
                false
            },
            false => Arc::weak_count(j) > 1 || Arc::strong_count(j) > 1,
        });
        let spawn_job = |job, jobs: &mut Vec<_>| {
            let handle = Arc::new(Mutex::new(()));
            jobs.push(handle.clone());
            spawn_job_runner(job_manager_sender.clone(), job, handle);
        };

        loop {
            match job_manager_receiver.recv_timeout(Duration::from_millis(50)) {
                Ok(Msg::NewJob { job, sender }) => {
                    let job = match active_jobs.len() < active_limit {
                        true => {
                            spawn_job(job, &mut active_jobs);
                            None
                        },
                        false => match job_queue.len() < queue_limit {
                            true => {
                                job_queue.push(job);
                                None
                            },
                            false => {
                                __e!("Discarding new job because job queue has reached its limit...");
                                Some(job)
                            },
                        },
                    };
                    if let Err(err) = sender.send(job) {
                        __e!("Failed sending back job-message to BlackHole instance: {}", err);
                    }
                },
                Ok(Msg::GetJob { sender }) => if let Err(SendError(job)) = sender.send(job_queue.pop()) {
                    if let Some(job) = job {
                        job_queue.push(job);
                    }
                },
                Ok(Msg::JobFinished { handle }) => {
                    drop(handle);
                    update_active_jobs(&mut active_jobs);
                    if let Some(job) = job_queue.pop() {
                        spawn_job(job, &mut active_jobs);
                    }
                    if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Ok(Msg::StopJobManager) => {
                    __e!("Warning: escaping BlackHole while there are {} active job(s) and unknown jobs in queue", active_jobs.len());
                    break;
                },
                Ok(Msg::StopJobManagerOnIdle) => {
                    stop_on_idle = true;
                    if active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Err(RecvTimeoutError::Timeout) => {
                    update_active_jobs(&mut active_jobs);
                    if stop_on_idle && active_jobs.is_empty() && job_queue.is_empty() {
                        break;
                    }
                },
                Err(RecvTimeoutError::Disconnected) => {
                    __p!("Stopping BlackHole...");
                    break;
                },
            };
        }
    }))
}

/// # Spawns job runner
fn spawn_job_runner<J>(job_manager_sender: SyncSender<Msg<J>>, mut job: J, handle: JobHandle) -> JobRunnerHandle where J: Job {
    thread::spawn(move || {
        let (sender, receiver) = mpsc::channel();
        loop {
            crate::run_to_end(job);

            if let Err(err) = job_manager_sender.send(Msg::GetJob { sender: sender.clone() }) {
                __e!("Failed to send message to job manager: {:?}", err);
                break;
            }
            match receiver.recv() {
                Ok(Some(new_job)) => job = new_job,
                Ok(None) => break,
                Err(err) => {
                    __e!("Failed to receive message from job manager: {:?}", err);
                    break;
                },
            };
        }
        if let Err(err) = job_manager_sender.send(Msg::JobFinished { handle }) {
            __e!("Failed to send job-finished message to job manager: {}", err);
        }
    })
}