dfmutex/
lib.rs

1use std::sync::LockResult;
2use std::sync::{Arc, Mutex, MutexGuard};
3use std::thread::{self, JoinHandle};
4
5///
6/// Deadlock-free Mutex locks
7///
8/// DFMutex is a library that provides a ***guaranteed deadlock-free*** Mutex
9/// implementation for the Rust language. Based on the research paper Higher-Order
10/// Leak and Deadlock Free Locks by Jules Jacobs and Stephanie Balzer. 
11/// 
12/// Example
13/// ```rust
14/// use dfmutex::{DFMutex, spawn};
15/// 
16/// fn main() {
17///     // Create a Mutex with any owned value
18///     let m = DFMutex::new(String::from("Lorem Ipsum"));
19/// 
20///     // Create a closure to pass in the thread.
21///     // The type of the created Mutex above should be same as the
22///     // argument to the closure.
23///     let closure = |mut dfm: DFMutex<String>| {
24///         let data = dfm.lock().unwrap();
25///         
26///         // Use the data
27///         println!("{}", data);
28///     };
29/// 
30///     // Spawn 8 threads and store their handles
31///     let mut handles = Vec::new();
32///     for _ in 0..8 {
33///         handles.push(spawn(&m, closure));    
34///     }
35/// 
36///     // Join all the threads
37///     for handle in handles.into_iter() {
38///         handle.join().unwrap();
39///     }
40/// }
41/// ```
42
43/// A deadlock-free mutual exclusion primitive useful for protecting shared data
44#[derive(Debug)]
45pub struct DFMutex<T> {
46    internal: Arc<Mutex<T>>,
47}
48
49impl<T> DFMutex<T> {
50    /// Creates a new mutex in an unlocked state ready for use.
51    pub fn new(data: T) -> Self {
52        DFMutex {
53            internal: Arc::new(Mutex::new(data)),
54        }
55    }
56
57    /// Acquires a mutex, blocking the current thread until it is able to do so.
58    pub fn lock(&mut self) -> LockResult<MutexGuard<'_, T>> {
59        self.internal.lock()
60    }
61
62    fn clone(&self) -> Self {
63        DFMutex { internal: Arc::clone(&self.internal) }
64    }
65}
66
67/// Spawns a new thread, returning a [`JoinHandle`] for it.
68pub fn spawn<D, T, F>(odfm: &DFMutex<D>, f: F) -> JoinHandle<T>
69where
70    F: FnOnce(DFMutex<D>) -> T + Send + 'static,
71    D: Send + 'static,
72    T: Send + 'static,
73{
74    let codfm = odfm.clone();
75
76    thread::spawn(move || f(codfm))
77}
78
79#[allow(dead_code)]
80mod test_commons {
81    pub const TEST_ITERATIONS: std::ops::Range<i32> = 0..10;
82    pub const THREADS_RANGE: std::ops::Range<i32> = 0..8;
83
84    const TASK_BASE: u64 = 40;
85
86    fn fibonacci(n: u64) -> u64 {
87        if n <= 1 {
88            return n;
89        }
90        fibonacci(n - 1) + fibonacci(n - 2)
91    }
92
93    pub fn compute_intensive_task() -> u64 {
94        fibonacci(TASK_BASE)
95    } 
96}
97
98#[cfg(test)]
99mod single_lock {
100    use rand::Rng;
101    use rand::thread_rng;
102    use std::thread;
103    use std::time::Duration;
104
105    use super::DFMutex;
106    use super::spawn;
107    use super::test_commons::*;
108
109    #[test]
110    pub fn constant_time() {
111        let m = DFMutex::new(String::from("Lorem Ipsum"));
112
113        let closure = |mut dfm: DFMutex<String>| {
114            let data = dfm.lock().unwrap();
115            thread::sleep(Duration::new(1, 0));
116            println!("{}", data);
117        };
118
119        let mut handles = Vec::new();
120
121        for _ in THREADS_RANGE {
122            handles.push(spawn(&m, closure));    
123        }
124
125        for handle in handles.into_iter() {
126            handle.join().unwrap();
127        }
128    }
129
130    #[test]
131    pub fn random_time() {
132        let m = DFMutex::new(String::from("Lorem Ipsum"));
133
134        let closure = |mut dfm: DFMutex<String>| {
135            let mut rng = thread_rng();
136            let data = dfm.lock().unwrap();
137            thread::sleep(Duration::new(rng.gen_range(1..3), 0));
138
139            println!("{}", data);
140        };
141
142        let mut handles = Vec::new();
143
144        for _ in THREADS_RANGE {
145            handles.push(spawn(&m, closure));
146        }
147
148        for handle in handles.into_iter() {
149            handle.join().unwrap();
150        }
151    }
152
153    #[test]
154    pub fn intensive_task() {
155        let m = DFMutex::new(String::from("Lorem Ipsum"));
156
157        let closure = |mut dfm: DFMutex<String>| {
158            let data = dfm.lock().unwrap();
159
160            let r = compute_intensive_task();
161
162            println!("{} {}", data, r);
163        };
164
165        let mut handles = Vec::new();
166
167        for _ in THREADS_RANGE {
168            handles.push(spawn(&m, closure));    
169        }
170
171        for handle in handles.into_iter() {
172            handle.join().unwrap();
173        }
174    }
175}
176
177#[cfg(test)]
178mod lock_pair_straight_order {
179    use std::ops::DerefMut;
180    use rand::Rng;
181    use rand::thread_rng;
182    use std::thread;
183    use std::time::Duration;
184
185    use super::DFMutex;
186    use super::spawn;
187    use super::test_commons::*;
188
189
190    #[test]
191    pub fn constant_time() {
192        for _ in TEST_ITERATIONS {
193            let m1 = DFMutex::new(String::from("1"));
194            let m2 = DFMutex::new(String::from("2"));
195            let m = DFMutex::new((m1, m2));
196
197            let closure = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
198                let mut guard = dfm.lock().unwrap();
199                let (m1, m2) = guard.deref_mut();
200
201                let m1d = m1.lock().unwrap();
202                let m2d = m2.lock().unwrap();
203
204                thread::sleep(Duration::new(1, 0));
205
206                println!("{} {}", m1d, m2d);
207            };
208
209            let mut handles = Vec::new();
210
211            for _ in THREADS_RANGE {
212                handles.push(spawn(&m, closure));
213            }
214
215            for handle in handles.into_iter() {
216                handle.join().unwrap();
217            }
218        }
219    }
220
221    #[test]
222    pub fn random_time() {
223        for _ in TEST_ITERATIONS {
224            let m1 = DFMutex::new(String::from("1"));
225            let m2 = DFMutex::new(String::from("2"));
226            let m = DFMutex::new((m1, m2));
227
228            let closure = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
229                let mut rng = thread_rng();
230                let mut guard = dfm.lock().unwrap();
231                let (m1, m2) = guard.deref_mut();
232
233                let m1d = m1.lock().unwrap();
234                let m2d = m2.lock().unwrap();
235
236                thread::sleep(Duration::new(rng.gen_range(1..3), 0));
237
238                println!("{} {}", m1d, m2d);
239            };
240
241            let mut handles = Vec::new();
242
243            for _ in THREADS_RANGE {
244                handles.push(spawn(&m, closure));    
245            }
246
247            for handle in handles.into_iter() {
248                handle.join().unwrap();
249            }        }
250    }
251
252    #[test]
253    pub fn intensive_task() {
254        for _ in TEST_ITERATIONS {
255            let m1 = DFMutex::new(String::from("1"));
256            let m2 = DFMutex::new(String::from("2"));
257            let m = DFMutex::new((m1, m2));
258
259            let closure = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
260                let mut guard = dfm.lock().unwrap();
261                let (m1, m2) = guard.deref_mut();
262
263                let m1d = m1.lock().unwrap();
264                let m2d = m2.lock().unwrap();
265
266                let r = compute_intensive_task();
267
268                println!("{} {} {}", m1d, m2d, r);
269            };
270
271            let mut handles = Vec::new();
272
273            for _ in THREADS_RANGE {
274                handles.push(spawn(&m, closure));    
275            }
276
277            for handle in handles.into_iter() {
278                handle.join().unwrap();
279            }
280        }
281    }
282}
283
284#[cfg(test)]
285mod lock_pair_swapped_order {
286    use std::ops::DerefMut;
287    use rand::Rng;
288    use rand::thread_rng;
289    use std::thread;
290    use std::time::Duration;
291
292    use super::DFMutex;
293    use super::spawn;
294    use super::test_commons::*;
295
296
297    #[test]
298    pub fn constant_time() {
299        for _ in TEST_ITERATIONS {
300            let m1 = DFMutex::new(String::from("1"));
301            let m2 = DFMutex::new(String::from("2"));
302            let m = DFMutex::new((m1, m2));
303
304            let closure_a = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
305                let mut guard = dfm.lock().unwrap();
306                let (m1, m2) = guard.deref_mut();
307
308                let m1d = m1.lock().unwrap();
309                let m2d = m2.lock().unwrap();
310
311                thread::sleep(Duration::new(1, 0));
312
313                println!("{} {}", m1d, m2d);
314            };
315
316            let closure_b = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
317                let mut guard = dfm.lock().unwrap();
318                let (m1, m2) = guard.deref_mut();
319
320                let m2d = m2.lock().unwrap();
321                let m1d = m1.lock().unwrap();
322
323                thread::sleep(Duration::new(1, 0));
324
325                println!("{} {}", m2d, m1d);
326            };
327
328            let mut flag = true;
329            let mut handles = Vec::new();
330
331            for _ in THREADS_RANGE {
332                if flag {
333                    handles.push(spawn(&m, closure_a));    
334                } else {
335                    handles.push(spawn(&m, closure_b));
336                }
337                flag = !flag;
338            }
339
340            for handle in handles.into_iter() {
341                handle.join().unwrap();
342            }
343        }
344    }
345
346    #[test]
347    pub fn random_time() {
348        for _ in TEST_ITERATIONS {
349            let m1 = DFMutex::new(String::from("1"));
350            let m2 = DFMutex::new(String::from("2"));
351            let m = DFMutex::new((m1, m2));
352
353            let closure_a = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
354                let mut rng = thread_rng();
355                let mut guard = dfm.lock().unwrap();
356                let (m1, m2) = guard.deref_mut();
357
358                let m1d = m1.lock().unwrap();
359                let m2d = m2.lock().unwrap();
360
361                thread::sleep(Duration::new(rng.gen_range(1..3), 0));
362
363                println!("{} {}", m1d, m2d);
364            };
365
366            let closure_b = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
367                let mut rng = thread_rng();
368                let mut guard = dfm.lock().unwrap();
369                let (m1, m2) = guard.deref_mut();
370
371                let m2d = m2.lock().unwrap();
372                let m1d = m1.lock().unwrap();
373
374                thread::sleep(Duration::new(rng.gen_range(1..3), 0));
375
376                println!("{} {}", m2d, m1d);
377            };
378
379            let mut flag = true;
380            let mut handles = Vec::new();
381
382            for _ in THREADS_RANGE {
383                if flag {
384                    handles.push(spawn(&m, closure_a));    
385                } else {
386                    handles.push(spawn(&m, closure_b));
387                }
388                flag = !flag;
389            }
390
391            for handle in handles.into_iter() {
392                handle.join().unwrap();
393            }
394        }
395    }
396
397    #[test]
398    pub fn intensive_task() {
399        for _ in TEST_ITERATIONS {
400            let m1 = DFMutex::new(String::from("1"));
401            let m2 = DFMutex::new(String::from("2"));
402            let m = DFMutex::new((m1, m2));
403
404            let closure_a = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
405                let mut guard = dfm.lock().unwrap();
406                let (m1, m2) = guard.deref_mut();
407
408                let m1d = m1.lock().unwrap();
409                let m2d = m2.lock().unwrap();
410
411                let r = compute_intensive_task();
412
413                println!("{} {} {}", m1d, m2d, r);
414            };
415
416            let closure_b = |mut dfm: DFMutex<(DFMutex<String>, DFMutex<String>)>| {
417                let mut guard = dfm.lock().unwrap();
418                let (m1, m2) = guard.deref_mut();
419
420                let m2d = m2.lock().unwrap();
421                let m1d = m1.lock().unwrap();
422
423                let r = compute_intensive_task();
424
425                println!("{} {} {}", m2d, m1d, r);
426            };
427
428            let mut flag = true;
429            let mut handles = Vec::new();
430
431            for _ in THREADS_RANGE {
432                if flag {
433                    handles.push(spawn(&m, closure_a));    
434                } else {
435                    handles.push(spawn(&m, closure_b));
436                }
437                flag = !flag;
438            }
439
440            for handle in handles.into_iter() {
441                handle.join().unwrap();
442            }
443        }
444    }
445}
446
447
448#[cfg(test)]
449mod dining_philisophers {
450    use std::thread;
451    use std::time::Duration;
452
453    use super::DFMutex;
454
455    const ITERATIONS: std::ops::Range<i32> = 0..500;
456    const FORK_RANGE: std::ops::RangeInclusive<i32> = 1..=5;
457
458    struct Philosopher {
459        id: i32,
460        left: DFMutex<String>,
461        right: DFMutex<String>,
462    }
463
464    impl Philosopher {
465        pub fn new(id: i32, left: DFMutex<String>, right: DFMutex<String>) -> Self {
466            Self { id, left, right }
467        }
468
469        pub fn think(&self) {
470            thread::sleep(Duration::new(0, 100000));
471        }
472
473        pub fn eat(&mut self) {
474            let left_fork = self.left.lock().unwrap();
475            println!("{} Acquired L -> {}", self.id, left_fork);
476            let right_fork = self.right.lock().unwrap();
477            println!("{} Acquired R -> {}", self.id, right_fork);
478
479            thread::sleep(Duration::new(0, 100000));
480
481            drop(left_fork);
482            drop(right_fork);
483        }
484    }
485
486    #[ignore = "Test is deadlock prone"]
487    #[test]
488    pub fn std() {
489        for i in ITERATIONS {
490            println!("===== Iteration {} =====", i);
491
492            let mut forks = Vec::new();
493
494            for i in FORK_RANGE {
495                forks.push(DFMutex::new(format!("Fork {}", i)));
496            }
497
498            let mut philosophers: Vec<Philosopher> = Vec::new();
499
500            philosophers.push(Philosopher::new(1, forks[0].clone(), forks[1].clone()));
501            philosophers.push(Philosopher::new(2, forks[1].clone(), forks[2].clone()));
502            philosophers.push(Philosopher::new(3, forks[2].clone(), forks[3].clone()));
503            philosophers.push(Philosopher::new(4, forks[3].clone(), forks[4].clone()));
504            philosophers.push(Philosopher::new(5, forks[4].clone(), forks[0].clone()));
505
506            let mut handles = Vec::new();
507            for _ in FORK_RANGE {
508                let mut phil = philosophers.pop().unwrap();
509                handles.push(thread::spawn(move || {
510                    phil.think();
511                    phil.eat();
512                }));
513            }
514
515            for i in handles.into_iter() {
516                i.join().unwrap();
517            }
518        }
519    }
520}