1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//! A simple thread pool library for Rust.
//!
//! # Examples
//!
//! Basic usage:
//!
//! ```
//! use std::sync::mpsc;
//! use std::thread;
//! use std::time::Duration;
//! use tasque::TaskQueueBuilder;
//!
//! // Creates a task queue.
//! // This queue spawns worker threads for executing tasks.
//! let queue = TaskQueueBuilder::new().worker_count(3).finish();
//!
//! // Executes asynchronous tasks.
//! let (tx, rx) = mpsc::channel();
//! for (i, tx) in (0..3).map(|i| (i, tx.clone())) {
//!     queue.enqueue(move || {
//!         thread::sleep(Duration::from_millis(20 - i * 10));
//!         let _ = tx.send(i);
//!     });
//! }
//!
//! // Waits results.
//! assert_eq!(rx.recv().ok(), Some(2));
//! assert_eq!(rx.recv().ok(), Some(1));
//! assert_eq!(rx.recv().ok(), Some(0));
//! ```
//!
//! This library exposes some [prometheus] metrics:
//!
//! [prometheus]: https://prometheus.io/
//!
//! ```no_run
//! # extern crate tasque;
//! # extern crate prometrics;
//! use std::time::Duration;
//! use prometrics::default_gatherer;
//! use tasque::TaskQueueBuilder;
//!
//! # fn main() {
//! let queue = TaskQueueBuilder::new().worker_count(1).finish();
//! queue.enqueue(|| panic!());
//! queue.enqueue(|| {});
//! std::thread::sleep(Duration::from_millis(100));
//!
//! let metrics = default_gatherer().lock().unwrap().gather().to_text();
//! assert_eq!(metrics,
//! [
//!  "# HELP tasque_queue_dequeued_tasks_total Number of dequeued tasks",
//!  "# TYPE tasque_queue_dequeued_tasks_total counter",
//!  "tasque_queue_dequeued_tasks_total 2",
//!  "# HELP tasque_queue_enqueued_tasks_total Number of enqueued tasks",
//!  "# TYPE tasque_queue_enqueued_tasks_total counter",
//!  "tasque_queue_enqueued_tasks_total 2",
//!  "# HELP tasque_worker_restarts_total Number of worker restarts",
//!  "# TYPE tasque_worker_restarts_total counter",
//!  "tasque_worker_restarts_total 1",
//!  "# HELP tasque_worker_task_duration_seconds Execution time of tasks",
//!  "# TYPE tasque_worker_task_duration_seconds histogram",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"0.001\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"0.01\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"0.1\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"1\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"10\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"100\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_bucket{le=\"+Inf\",worker=\"0\"} 1",
//!  "tasque_worker_task_duration_seconds_sum{worker=\"0\"} 0.000001392",
//!  "tasque_worker_task_duration_seconds_count{worker=\"0\"} 1"
//! ].iter().map(|s| format!("{}\n", s)).collect::<Vec<_>>().join("")
//! );
//! # }
//! ```
#![warn(missing_docs)]
extern crate num_cpus;
extern crate prometrics;

pub use queue::{TaskQueue, TaskQueueBuilder};

mod metrics;
mod queue;
mod task;
mod worker;

#[cfg(test)]
mod tests {
    use std::sync::mpsc;
    use std::thread;
    use std::time::Duration;
    use super::*;

    #[test]
    fn single_worker_works() {
        let (tx, rx) = mpsc::channel();
        let queue = TaskQueueBuilder::new().worker_count(1).finish();
        for (i, tx) in (0..3).map(|i| (i, tx.clone())) {
            queue.enqueue(move || {
                thread::sleep(Duration::from_millis(30 - i * 10));
                let _ = tx.send(i);
            });
        }
        assert_eq!(rx.recv().ok(), Some(0));
        assert_eq!(rx.recv().ok(), Some(1));
        assert_eq!(rx.recv().ok(), Some(2));
    }

    #[test]
    fn multiple_workers_works() {
        let (tx, rx) = mpsc::channel();
        let queue = TaskQueueBuilder::new().worker_count(3).finish();
        for (i, tx) in (0..3).map(|i| (i, tx.clone())) {
            queue.enqueue(move || {
                thread::sleep(Duration::from_millis(20 - i * 10));
                let _ = tx.send(i);
            });
        }
        assert_eq!(rx.recv().ok(), Some(2));
        assert_eq!(rx.recv().ok(), Some(1));
        assert_eq!(rx.recv().ok(), Some(0));
    }

    #[test]
    fn worker_restart_works() {
        let (tx, rx) = mpsc::channel();
        let queue = TaskQueueBuilder::new().worker_count(1).finish();
        queue.enqueue(|| panic!());
        queue.enqueue(move || tx.send(0).unwrap());
        assert_eq!(rx.recv().ok(), Some(0));
    }
}