random_pool/
lib.rs

1extern crate rand;
2
3use rand::{thread_rng, Rng};
4
5use std::sync::Arc;
6use std::sync::{Mutex, MutexGuard};
7
8/// A threadsafe, fixed-size, pool that holds elements that are each individually guarded behind a Mutex.
9///
10/// When getting an element, a random element is selected from the pool, locked, and returned.
11/// If a lock for the random element cannot be gotten, the pool will try the next available element.
12///
13/// The random nature of the pool makes it particularly useful for pooling mutable resources that are fungible,
14/// like dynamic caches.
15/// If elements are attempted to be inserted into the caches, with a random insertion pattern,
16/// the caches will trend towards having the same contents.
17///
18#[derive(Clone, Debug)]
19pub struct RandomPool<T> {
20    elements: Vec<Arc<Mutex<T>>>
21}
22
23impl<T> RandomPool<T> {
24
25    /// Create a new pool.
26    ///
27    /// # Arguments
28    ///
29    /// * `number_of_elements` - Number of Ts that will be in the pool.
30    /// * `element_creation_function` - The function that is used to create each element.
31    /// This will be called the number of times specified by the `number_of_elements` argument.
32    ///
33    /// # Concurrency
34    ///
35    /// You should want the number of elements to correspond to the number of threads that may access the pool.
36    /// Any more, and you are wasting space for elements that won't relieve lock contention.
37    /// Any less, and try_get() may start to return None, and get() may spinlock, as all elements may be locked at once.
38    pub fn new(number_of_elements: usize, element_creation_function: fn() -> T) -> RandomPool<T> {
39        let mut elements: Vec<Arc<Mutex<T>>> = vec!();
40
41        for _ in 0..number_of_elements {
42            elements.push(Arc::new(Mutex::new(element_creation_function())))
43        }
44        RandomPool {
45            elements: elements
46        }
47    }
48
49
50    /// Try to get a random element from the pool.
51    /// If all elements are locked, this will return `None`.
52    ///
53    /// # Concurrency
54    ///
55    /// This will not spinlock if all elements are locked.
56    ///
57    /// It is possible for this to miss an unlocked element if an element that has been passed over
58    /// because it was locked, becomes unlocked after it was checked, but before the method ends.
59    ///
60    /// Despite how rare this event is, it is unwise to call `unwrap()` on the Option returned
61    /// from this function, as this may return `None` because of this concurrency quirk.
62    pub fn try_get<'a>(&'a self) -> Option<MutexGuard<'a, T>> {
63
64        // Randomize the range that can be accessed
65        let mut range: Vec<usize> = (0..self.elements.len()).collect();
66        thread_rng().shuffle(range.as_mut_slice());
67
68        for i in range.into_iter() {
69            if let Some(c) = self.elements[i].try_lock().ok() {
70                return Some(c) // Found a cache that wasn't locked
71            }
72        }
73        None // All caches are occupied
74    }
75
76    /// Attempts to return a random element from the pool.
77    /// If the first element is locked, it will try the next random element.
78    /// If all elements are locked, the pool will deadlock until one of the locks frees itself.
79    ///
80    /// # Concurrency
81    ///
82    /// This will spinlock if all locks in the pool are taken.
83    pub fn get<'a>(&'a self) -> MutexGuard<'a, T> {
84        // Randomize the range that can be accessed
85        let mut range: Vec<usize> = (0..self.elements.len()).collect();
86        thread_rng().shuffle(range.as_mut_slice());
87
88        let mut index: usize = 0;
89        loop {
90            match self.elements[index].try_lock().ok() {
91                Some(element) => return element,
92                None => index = (index + 1) % self.elements.len()
93            }
94        }
95    }
96
97    /// Alter every element in the pool by locking them one at a time.
98    ///
99    /// # Arguments
100    ///
101    /// * `function` - The function that will be called on every element in the pool.
102    ///
103    /// # Concurrency
104    ///
105    /// If a lock for any of the pooled elements is held elsewhere, then this function will block until
106    /// a lock for the given element can be owned by this function.
107    /// As a result, this function may take quite a while to complete.
108    ///
109    /// The benefit of this approach, is that it will not effectively lock the whole pool, only one element at a time.
110    /// This should only degrade the max performance of the pool to `(n-1)/n`,
111    /// with `n` being the number of elements in the cache,
112    /// instead of 0 while this function is executed.
113    pub fn access_all<'a>(&'a self, function: fn(MutexGuard<'a, T>) ) {
114        for e in self.elements.iter() {
115            // All entries in the pooled try to lock, one at a time, so that the provided function
116            // can operate on the pool's contents.
117            function(e.lock().unwrap())
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests{
124    use super::*;
125
126    use std::thread;
127
128    use std::time;
129
130    /// This test can fail, although it is probabilistically unlikely to do so.
131    #[test]
132    fn counter() {
133        let pool: RandomPool<usize> = RandomPool::new(4, || 0);
134
135        for _ in 0..1_000_000 {
136            *pool.try_get().unwrap() += 1;
137        }
138        // Expected value for one counter is 250,000.
139        assert!(*pool.try_get().unwrap() > 200_000);
140        assert!(*pool.try_get().unwrap() < 300_000);
141    }
142
143    /// This test can fail, although it is probabilistically unlikely to do so.
144    #[test]
145    fn counter_concurrent() {
146        // Assign 0 to all 4 initial counters.
147        let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(4, || 0));
148        let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
149        let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
150
151        let thread_1 = thread::spawn(move || {
152            for _ in 0..500_000 {
153                *pool_reference_copy_1.try_get().unwrap() += 1;
154            }
155        });
156
157        let thread_2 = thread::spawn(move || {
158            for _ in 0..500_000 {
159                *pool_reference_copy_2.try_get().unwrap() += 1;
160            }
161        });
162
163        let _ = thread_1.join();
164        let _ = thread_2.join();
165
166
167        // Because both threads add 500,000, split among 4 counters, the expected value for any
168        // of the counters is 250,000.
169        assert!(*pool.try_get().unwrap() > 200_000);
170        assert!(*pool.try_get().unwrap() < 300_000);
171
172    }
173
174
175    #[test]
176    fn alter_all() {
177        // Assign 0 to all initial counters
178        let pool: RandomPool<usize> = RandomPool::new(4, || 0);
179
180        pool.access_all(|mut x: MutexGuard<usize>| *x = 400 );
181
182        // A `let` binding is needed here to increase the lifetime of the value from the pool.
183        let value_from_pool = *pool.try_get().unwrap();
184
185        // Because a function setting the counter to 400 was ran on every element in the pool,
186        // the value for any random element is 400.
187        assert_eq!(value_from_pool, 400);
188
189    }
190
191
192    #[test]
193    fn locks_taken() {
194        let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(2, || 7));
195        let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
196        let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
197
198        // Thread 1 owns a lock for 1 second
199        let _thread_1 = thread::spawn(move || {
200            let _locked_value = pool_reference_copy_1.try_get().unwrap();
201            let one_sec = time::Duration::from_millis(1_000);
202            thread::sleep(one_sec);
203        });
204
205        // Thread 2 owns a lock for 1 second
206        let _thread_2 = thread::spawn(move || {
207            let _locked_value = pool_reference_copy_2.try_get().unwrap();
208            let one_sec = time::Duration::from_millis(1_000);
209            thread::sleep(one_sec);
210        });
211
212        // The main thread waits for half a second, then tries to get a lock.
213        let half_a_sec = time::Duration::from_millis(500);
214        thread::sleep(half_a_sec);
215
216        // This will fail, because all elements in the pool are locked
217        assert!(pool.try_get().is_none());
218        // Sleep another a second
219        let one_sec = time::Duration::from_millis(1_000);
220        thread::sleep(one_sec);
221        // The locks will have been unlocked by now and the pool will return an element.
222        assert!(pool.try_get().is_some());
223    }
224
225    #[test]
226    fn spinlock() {
227        let pool: Arc<RandomPool<usize>> = Arc::new(RandomPool::new(2, || 7));
228        let pool_reference_copy_1: Arc<RandomPool<usize>> = pool.clone();
229        let pool_reference_copy_2: Arc<RandomPool<usize>> = pool.clone();
230
231        // Get the time before the threads are spawned
232        let initial_time = time::Instant::now();
233
234        // Thread 1 owns a lock for 1 second
235        let _thread_1 = thread::spawn(move || {
236            let _locked_value = pool_reference_copy_1.try_get().unwrap();
237            let one_sec = time::Duration::from_millis(1_000);
238            thread::sleep(one_sec);
239        });
240
241        // Thread 2 owns a lock for 1 second
242        let _thread_2 = thread::spawn(move || {
243            let _locked_value = pool_reference_copy_2.try_get().unwrap();
244            let one_sec = time::Duration::from_millis(1_000);
245            thread::sleep(one_sec);
246        });
247
248        // The main thread waits for half a second, then tries to get a lock.
249        // This is to make sure that the earlier threads do get their locks.
250        let half_a_sec = time::Duration::from_millis(500);
251        thread::sleep(half_a_sec);
252
253        // This will not spinlock, and instead return immediately.
254        assert!(pool.try_get().is_none());
255        // Because `try_get()` won't spinlock, it can be assumed that this operation will take
256        // less than the remaining 500 ms.
257        assert!(initial_time.elapsed() < time::Duration::from_millis(1_000) );
258
259
260        // This will spinlock
261        let _locked_value = pool.get();
262
263        // Even though the `get()` is called after half a second, it must spin for another
264        // half a second to wait for one of the threads to release one of their locks.
265        // When this happens, the spinlock will gain access to the lock, and this assertion can run.
266        assert!(initial_time.elapsed() >= time::Duration::from_millis(1_000))
267
268
269    }
270
271}