workerpool_rs/
pool.rs

1//! ## Pool
2//!
3//! With this module, we are able to synchronize channels,
4//! start jobs, wait for workers, and many others concurrent
5//! tasks are made easy.
6
7use std::{
8    fmt::Display,
9    sync::{mpsc, Arc, Mutex},
10    thread,
11};
12
13// Basic types for concurrent tasks
14type Job = Box<dyn FnOnce() + Send + Sync + 'static>;
15type JobReceiver = Arc<Mutex<mpsc::Receiver<Job>>>;
16type Handle = thread::JoinHandle<()>;
17
18/// Implements a continuous pool of rust threads thats doesn't stops
19/// unless it gets out of scope.
20///
21pub struct WorkerPool {
22    workers: Vec<Worker>,
23    sender: mpsc::Sender<Job>,
24}
25
26impl WorkerPool {
27    /// Constructs a new WorkerPool of size x.
28    ///
29    /// **size**: usize - Is the number of workers in WorkerPool object. \
30    /// **returns**: a WorkerPool object.
31    ///
32    /// # Examples
33    ///
34    /// ```
35    /// use workerpool_rs::pool::WorkerPool;
36    ///
37    /// let pool = WorkerPool::new(3);
38    ///
39    /// assert_eq!("workers[] = (id: 0)(id: 1)(id: 2)", pool.to_string());
40    /// ```
41    pub fn new(size: usize) -> WorkerPool {
42        let (tx, rx) = mpsc::channel();
43        let mut workers = Vec::<Worker>::with_capacity(size);
44        let rec = Arc::new(Mutex::new(rx));
45
46        for id in 0..size {
47            workers.push(Worker::new(id, Arc::clone(&rec)));
48        }
49
50        WorkerPool {
51            workers,
52            sender: tx,
53        }
54    }
55
56    /// Executes a job. The job is moved to closure, as this function is FnOnce. \
57    ///
58    /// **f**: A FnOnce closure hosted by a Box smart pointer.
59    /// ## Examples
60    ///
61    /// ```
62    /// use workerpool_rs::pool::WorkerPool;
63    /// use std::sync::mpsc;
64    /// use std::sync::{Arc, Mutex};
65    ///
66    /// let njobs = 20;
67    /// let nworkers = 10;
68    ///
69    /// let pool = WorkerPool::new(nworkers);
70    /// let (tx, rx) = mpsc::channel();
71    ///
72    /// let atx = Arc::new(Mutex::new(tx));
73    ///
74    /// for _ in 0 .. njobs {
75    ///     let atx = atx.clone();
76    ///     pool.execute(move || {
77    ///         let tx = atx.lock().unwrap();
78    ///         tx.send(1).unwrap();
79    ///     });
80    /// }
81    ///
82    /// let sum = rx.iter().take(njobs).sum();
83    /// assert_eq!(njobs, sum);
84    /// ```
85    pub fn execute<J>(&self, f: J)
86    where
87        J: FnOnce() + Send + Sync + 'static,
88    {
89        let job = Box::new(f);
90        self.sender.send(job).expect("Cant send job");
91    }
92}
93
94// Implements Display for WorkerPool. This is usefull as we can able
95// to compare and make unit tests more easily.
96impl Display for WorkerPool {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        let mut buffer = String::new();
99        for i in &self.workers {
100            buffer.push_str(&i.to_string());
101        }
102        write!(f, "workers[] = {}", buffer)
103    }
104}
105
106// A structure that holds an id and thread handle.
107//
108// id: usize - An id for worker indentification.\
109// handle: JoinHandle<()> - a handle that has a working thread.
110struct Worker {
111    id: usize,
112    _handle: Handle,
113}
114
115impl Worker {
116    // Constructs a new Worker.
117    //
118    // id: usize - Worker identificator.
119    // handle: JoinHandle<()> - a thread handle.
120    fn new(id: usize, handle: JobReceiver) -> Worker {
121        let handle = thread::spawn(move || loop {
122            let job = match handle.lock().expect("Cant acquire lock").recv() {
123                Ok(data) => data,
124                Err(_) => continue,
125            };
126
127            job();
128        });
129
130        Worker {
131            id,
132            _handle: handle,
133        }
134    }
135}
136
137// Implements Display for Worker as this simplifys test writing.
138impl Display for Worker {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(f, "(id: {})", self.id,)
141    }
142}
143
144// This sections are the beginning of workerpool module unit tests.
145#[cfg(test)]
146mod unit_tests {
147    use super::*;
148
149    #[test]
150    fn worker_should_return_new() {
151        let (_, rx) = mpsc::channel();
152        let receiver = Arc::new(Mutex::new(rx));
153        let w = Worker::new(1, Arc::clone(&receiver));
154        assert_eq!("(id: 1)", w.to_string());
155    }
156
157    #[test]
158    fn workerpool_should_return_new() {
159        let expected = "workers[] = (id: 0)(id: 1)(id: 2)".to_string();
160        let pool = WorkerPool::new(3);
161        assert_eq!(expected.to_string(), pool.to_string());
162    }
163
164    #[test]
165    fn workerpool_should_execute_job_succeed() {
166        let pool = WorkerPool::new(1);
167        for _ in 0..10000 {
168            pool.execute(|| {
169                let _sum = 3 + 1;
170            });
171        }
172    }
173}