random_pool/lib.rs
1extern crate rand;
2
3use rand::{thread_rng, Rng};
4
5use std::sync::Arc;
6use std::sync::{Mutex, MutexGuard};
7
8/// A threadsafe, fixed-size, pool that holds elements that are each individually guarded behind a Mutex.
9///
10/// When getting an element, a random element is selected from the pool, locked, and returned.
11/// If a lock for the random element cannot be gotten, the pool will try the next available element.
12///
13/// The random nature of the pool makes it particularly useful for pooling mutable resources that are fungible,
14/// like dynamic caches.
15/// If elements are attempted to be inserted into the caches, with a random insertion pattern,
16/// the caches will trend towards having the same contents.
17///
18#[derive(Clone, Debug)]
19pub struct RandomPool<T> {
20 elements: Vec<Arc<Mutex<T>>>
21}
22
23impl<T> RandomPool<T> {
24
25 /// Create a new pool.
26 ///
27 /// # Arguments
28 ///
29 /// * `number_of_elements` - Number of Ts that will be in the pool.
30 /// * `element_creation_function` - The function that is used to create each element.
31 /// This will be called the number of times specified by the `number_of_elements` argument.
32 ///
33 /// # Concurrency
34 ///
35 /// You should want the number of elements to correspond to the number of threads that may access the pool.
36 /// Any more, and you are wasting space for elements that won't relieve lock contention.
37 /// Any less, and try_get() may start to return None, and get() may spinlock, as all elements may be locked at once.
38 pub fn new(number_of_elements: usize, element_creation_function: fn() -> T) -> RandomPool<T> {
39 let mut elements: Vec<Arc<Mutex<T>>> = vec!();
40
41 for _ in 0..number_of_elements {
42 elements.push(Arc::new(Mutex::new(element_creation_function())))
43 }
44 RandomPool {
45 elements: elements
46 }
47 }
48
49
50 /// Try to get a random element from the pool.
51 /// If all elements are locked, this will return `None`.
52 ///
53 /// # Concurrency
54 ///
55 /// This will not spinlock if all elements are locked.
56 ///
57 /// It is possible for this to miss an unlocked element if an element that has been passed over
58 /// because it was locked, becomes unlocked after it was checked, but before the method ends.
59 ///
60 /// Despite how rare this event is, it is unwise to call `unwrap()` on the Option returned
61 /// from this function, as this may return `None` because of this concurrency quirk.
62 pub fn try_get<'a>(&'a self) -> Option<MutexGuard<'a, T>> {
63
64 // Randomize the range that can be accessed
65 let mut range: Vec<usize> = (0..self.elements.len()).collect();
66 thread_rng().shuffle(range.as_mut_slice());
67
68 for i in range.into_iter() {
69 if let Some(c) = self.elements[i].try_lock().ok() {
70 return Some(c) // Found a cache that wasn't locked
71 }
72 }
73 None // All caches are occupied
74 }
75
76 /// Attempts to return a random element from the pool.
77 /// If the first element is locked, it will try the next random element.
78 /// If all elements are locked, the pool will deadlock until one of the locks frees itself.
79 ///
80 /// # Concurrency
81 ///
82 /// This will spinlock if all locks in the pool are taken.
83 pub fn get<'a>(&'a self) -> MutexGuard<'a, T> {
84 // Randomize the range that can be accessed
85 let mut range: Vec<usize> = (0..self.elements.len()).collect();
86 thread_rng().shuffle(range.as_mut_slice());
87
88 let mut index: usize = 0;
89 loop {
90 match self.elements[index].try_lock().ok() {
91 Some(element) => return element,
92 None => index = (index + 1) % self.elements.len()
93 }
94 }
95 }
96
97 /// Alter every element in the pool by locking them one at a time.
98 ///
99 /// # Arguments
100 ///
101 /// * `function` - The function that will be called on every element in the pool.
102 ///
103 /// # Concurrency
104 ///
105 /// If a lock for any of the pooled elements is held elsewhere, then this function will block until
106 /// a lock for the given element can be owned by this function.
107 /// As a result, this function may take quite a while to complete.
108 ///
109 /// The benefit of this approach, is that it will not effectively lock the whole pool, only one element at a time.
110 /// This should only degrade the max performance of the pool to `(n-1)/n`,
111 /// with `n` being the number of elements in the cache,
112 /// instead of 0 while this function is executed.
113 pub fn access_all<'a>(&'a self, function: fn(MutexGuard<'a, T>) ) {
114 for e in self.elements.iter() {
115 // All entries in the pooled try to lock, one at a time, so that the provided function
116 // can operate on the pool's contents.
117 function(e.lock().unwrap())
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests{
124 use super::*;
125
126 use std::thread;
127
128 use std::time;
129
130 /// This test can fail, although it is probabilistically unlikely to do so.
131 #[test]
132 fn counter() {
133 let pool: RandomPool<usize> = RandomPool::new(4, || 0);
134
135 for _ in 0..1_000_000 {
136 *pool.try_get().unwrap() += 1;
137 }
138 // Expected value for one counter is 250,000.
139 assert!(*pool.try_get().unwrap() > 200_000);
140 assert!(*pool.try_get().unwrap() < 300_000);
141 }
142
143 /// This test can fail, although it is probabilistically unlikely to do so.
144 #[test]
145 fn counter_concurrent() {
146 // Assign 0 to all 4 initial counters.
147 let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(4, || 0));
148 let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
149 let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
150
151 let thread_1 = thread::spawn(move || {
152 for _ in 0..500_000 {
153 *pool_reference_copy_1.try_get().unwrap() += 1;
154 }
155 });
156
157 let thread_2 = thread::spawn(move || {
158 for _ in 0..500_000 {
159 *pool_reference_copy_2.try_get().unwrap() += 1;
160 }
161 });
162
163 let _ = thread_1.join();
164 let _ = thread_2.join();
165
166
167 // Because both threads add 500,000, split among 4 counters, the expected value for any
168 // of the counters is 250,000.
169 assert!(*pool.try_get().unwrap() > 200_000);
170 assert!(*pool.try_get().unwrap() < 300_000);
171
172 }
173
174
175 #[test]
176 fn alter_all() {
177 // Assign 0 to all initial counters
178 let pool: RandomPool<usize> = RandomPool::new(4, || 0);
179
180 pool.access_all(|mut x: MutexGuard<usize>| *x = 400 );
181
182 // A `let` binding is needed here to increase the lifetime of the value from the pool.
183 let value_from_pool = *pool.try_get().unwrap();
184
185 // Because a function setting the counter to 400 was ran on every element in the pool,
186 // the value for any random element is 400.
187 assert_eq!(value_from_pool, 400);
188
189 }
190
191
192 #[test]
193 fn locks_taken() {
194 let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(2, || 7));
195 let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
196 let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
197
198 // Thread 1 owns a lock for 1 second
199 let _thread_1 = thread::spawn(move || {
200 let _locked_value = pool_reference_copy_1.try_get().unwrap();
201 let one_sec = time::Duration::from_millis(1_000);
202 thread::sleep(one_sec);
203 });
204
205 // Thread 2 owns a lock for 1 second
206 let _thread_2 = thread::spawn(move || {
207 let _locked_value = pool_reference_copy_2.try_get().unwrap();
208 let one_sec = time::Duration::from_millis(1_000);
209 thread::sleep(one_sec);
210 });
211
212 // The main thread waits for half a second, then tries to get a lock.
213 let half_a_sec = time::Duration::from_millis(500);
214 thread::sleep(half_a_sec);
215
216 // This will fail, because all elements in the pool are locked
217 assert!(pool.try_get().is_none());
218 // Sleep another a second
219 let one_sec = time::Duration::from_millis(1_000);
220 thread::sleep(one_sec);
221 // The locks will have been unlocked by now and the pool will return an element.
222 assert!(pool.try_get().is_some());
223 }
224
225 #[test]
226 fn spinlock() {
227 let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(2, || 7));
228 let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
229 let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
230
231 // Get the time before the threads are spawned
232 let initial_time = time::Instant::now();
233
234 // Thread 1 owns a lock for 1 second
235 let _thread_1 = thread::spawn(move || {
236 let _locked_value = pool_reference_copy_1.try_get().unwrap();
237 let one_sec = time::Duration::from_millis(1_000);
238 thread::sleep(one_sec);
239 });
240
241 // Thread 2 owns a lock for 1 second
242 let _thread_2 = thread::spawn(move || {
243 let _locked_value = pool_reference_copy_2.try_get().unwrap();
244 let one_sec = time::Duration::from_millis(1_000);
245 thread::sleep(one_sec);
246 });
247
248 // The main thread waits for half a second, then tries to get a lock.
249 // This is to make sure that the earlier threads do get their locks.
250 let half_a_sec = time::Duration::from_millis(500);
251 thread::sleep(half_a_sec);
252
253 // This will not spinlock, and instead return immediately.
254 assert!(pool.try_get().is_none());
255 // Because `try_get()` won't spinlock, it can be assumed that this operation will take
256 // less than the remaining 500 ms.
257 assert!(initial_time.elapsed() < time::Duration::from_millis(1_000) );
258
259
260 // This will spinlock
261 let _locked_value = pool.get();
262
263 // Even though the `get()` is called after half a second, it must spin for another
264 // half a second to wait for one of the threads to release one of their locks.
265 // When this happens, the spinlock will gain access to the lock, and this assertion can run.
266 assert!(initial_time.elapsed() >= time::Duration::from_millis(1_000))
267
268
269 }
270
271}