1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::fmt::Debug;
use crossbeam_utils::sync::WaitGroup;
use hashbrown::HashMap;
use parking_lot::Mutex;
#[derive(Clone, Debug)]
struct Call<T>
where
T: Clone + Debug,
{
wg: WaitGroup,
res: Option<T>,
}
impl<T> Call<T>
where
T: Clone + Debug,
{
fn new() -> Call<T> {
Call {
wg: WaitGroup::new(),
res: None,
}
}
}
#[derive(Default)]
pub struct Group<T>
where
T: Clone + Debug,
{
m: Mutex<HashMap<String, Box<Call<T>>>>,
}
impl<T> Group<T>
where
T: Clone + Debug,
{
pub fn new() -> Group<T> {
Group {
m: Mutex::new(HashMap::new()),
}
}
pub fn work<F>(&self, key: &str, func: F) -> T
where
F: Fn() -> T,
{
let mut m = self.m.lock();
if let Some(c) = m.get(key) {
let c = c.clone();
drop(m);
c.wg.wait();
return c.res.unwrap();
}
let c = Call::new();
let wg = c.wg.clone();
let mut job = m.entry(key.to_owned()).or_insert(Box::new(c));
job.res = Some(func());
drop(m);
drop(wg);
let mut m = self.m.lock();
let c = m.remove(key).unwrap();
drop(m);
c.res.unwrap()
}
}
#[cfg(test)]
mod tests {
use super::Group;
const RES: usize = 7;
#[test]
fn test_simple() {
let g = Group::new();
let res = g.work("key", || RES);
assert_eq!(res, RES);
}
#[test]
fn test_multiple_threads() {
use std::time::Duration;
use crossbeam_utils::thread;
fn expensive_fn() -> usize {
std::thread::sleep(Duration::new(0, 500));
RES
}
let g = Group::new();
thread::scope(|s| {
for _ in 0..10 {
s.spawn(|_| {
let res = g.work("key", expensive_fn);
assert_eq!(res, RES);
});
}
})
.unwrap();
}
}