workerpool_rs/pool.rs
1//! ## Pool
2//!
3//! With this module, we are able to synchronize channels,
4//! start jobs, wait for workers, and many others concurrent
5//! tasks are made easy.
6
7use std::{
8 fmt::Display,
9 sync::{mpsc, Arc, Mutex},
10 thread,
11};
12
13// Basic types for concurrent tasks
14type Job = Box<dyn FnOnce() + Send + Sync + 'static>;
15type JobReceiver = Arc<Mutex<mpsc::Receiver<Job>>>;
16type Handle = thread::JoinHandle<()>;
17
18/// Implements a continuous pool of rust threads thats doesn't stops
19/// unless it gets out of scope.
20///
21pub struct WorkerPool {
22 workers: Vec<Worker>,
23 sender: mpsc::Sender<Job>,
24}
25
26impl WorkerPool {
27 /// Constructs a new WorkerPool of size x.
28 ///
29 /// **size**: usize - Is the number of workers in WorkerPool object. \
30 /// **returns**: a WorkerPool object.
31 ///
32 /// # Examples
33 ///
34 /// ```
35 /// use workerpool_rs::pool::WorkerPool;
36 ///
37 /// let pool = WorkerPool::new(3);
38 ///
39 /// assert_eq!("workers[] = (id: 0)(id: 1)(id: 2)", pool.to_string());
40 /// ```
41 pub fn new(size: usize) -> WorkerPool {
42 let (tx, rx) = mpsc::channel();
43 let mut workers = Vec::<Worker>::with_capacity(size);
44 let rec = Arc::new(Mutex::new(rx));
45
46 for id in 0..size {
47 workers.push(Worker::new(id, Arc::clone(&rec)));
48 }
49
50 WorkerPool {
51 workers,
52 sender: tx,
53 }
54 }
55
56 /// Executes a job. The job is moved to closure, as this function is FnOnce. \
57 ///
58 /// **f**: A FnOnce closure hosted by a Box smart pointer.
59 /// ## Examples
60 ///
61 /// ```
62 /// use workerpool_rs::pool::WorkerPool;
63 /// use std::sync::mpsc;
64 /// use std::sync::{Arc, Mutex};
65 ///
66 /// let njobs = 20;
67 /// let nworkers = 10;
68 ///
69 /// let pool = WorkerPool::new(nworkers);
70 /// let (tx, rx) = mpsc::channel();
71 ///
72 /// let atx = Arc::new(Mutex::new(tx));
73 ///
74 /// for _ in 0 .. njobs {
75 /// let atx = atx.clone();
76 /// pool.execute(move || {
77 /// let tx = atx.lock().unwrap();
78 /// tx.send(1).unwrap();
79 /// });
80 /// }
81 ///
82 /// let sum = rx.iter().take(njobs).sum();
83 /// assert_eq!(njobs, sum);
84 /// ```
85 pub fn execute<J>(&self, f: J)
86 where
87 J: FnOnce() + Send + Sync + 'static,
88 {
89 let job = Box::new(f);
90 self.sender.send(job).expect("Cant send job");
91 }
92}
93
94// Implements Display for WorkerPool. This is usefull as we can able
95// to compare and make unit tests more easily.
96impl Display for WorkerPool {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 let mut buffer = String::new();
99 for i in &self.workers {
100 buffer.push_str(&i.to_string());
101 }
102 write!(f, "workers[] = {}", buffer)
103 }
104}
105
106// A structure that holds an id and thread handle.
107//
108// id: usize - An id for worker indentification.\
109// handle: JoinHandle<()> - a handle that has a working thread.
110struct Worker {
111 id: usize,
112 _handle: Handle,
113}
114
115impl Worker {
116 // Constructs a new Worker.
117 //
118 // id: usize - Worker identificator.
119 // handle: JoinHandle<()> - a thread handle.
120 fn new(id: usize, handle: JobReceiver) -> Worker {
121 let handle = thread::spawn(move || loop {
122 let job = match handle.lock().expect("Cant acquire lock").recv() {
123 Ok(data) => data,
124 Err(_) => continue,
125 };
126
127 job();
128 });
129
130 Worker {
131 id,
132 _handle: handle,
133 }
134 }
135}
136
137// Implements Display for Worker as this simplifys test writing.
138impl Display for Worker {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(f, "(id: {})", self.id,)
141 }
142}
143
144// This sections are the beginning of workerpool module unit tests.
145#[cfg(test)]
146mod unit_tests {
147 use super::*;
148
149 #[test]
150 fn worker_should_return_new() {
151 let (_, rx) = mpsc::channel();
152 let receiver = Arc::new(Mutex::new(rx));
153 let w = Worker::new(1, Arc::clone(&receiver));
154 assert_eq!("(id: 1)", w.to_string());
155 }
156
157 #[test]
158 fn workerpool_should_return_new() {
159 let expected = "workers[] = (id: 0)(id: 1)(id: 2)".to_string();
160 let pool = WorkerPool::new(3);
161 assert_eq!(expected.to_string(), pool.to_string());
162 }
163
164 #[test]
165 fn workerpool_should_execute_job_succeed() {
166 let pool = WorkerPool::new(1);
167 for _ in 0..10000 {
168 pool.execute(|| {
169 let _sum = 3 + 1;
170 });
171 }
172 }
173}