mpmc/
mpmc.rs

1use std::thread::scope;
2use std::time::Instant;
3
4use concurrent_map::ConcurrentMap;
5
6const PRODUCERS: usize = 128;
7const CONSUMERS: usize = 128;
8const N: usize = 1024 * 1024;
9const PRODUCER_N: usize = N / PRODUCERS;
10const CONSUMER_N: usize = N / CONSUMERS;
11
12fn producer(cm: ConcurrentMap<usize, usize>, min: usize, max: usize) {
13    for i in min..max {
14        cm.insert(i, i);
15    }
16}
17
18fn consumer(cm: ConcurrentMap<usize, usize>, n: usize) {
19    let mut popped = 0;
20    while popped < n {
21        if let Some((k, v)) = cm.pop_first() {
22            assert_eq!(k, v);
23            popped += 1;
24        }
25    }
26}
27
28fn main() {
29    let cm = ConcurrentMap::default();
30
31    let before = Instant::now();
32    scope(|s| {
33        let mut handles = vec![];
34
35        for i in 0..PRODUCERS {
36            let min = i * PRODUCER_N;
37            let max = (i + 1) * PRODUCER_N;
38            let cm = cm.clone();
39            let handle = s.spawn(move || producer(cm, min, max));
40            handles.push(handle);
41        }
42
43        for _ in 0..CONSUMERS {
44            let cm = cm.clone();
45            let handle = s.spawn(move || consumer(cm, CONSUMER_N));
46            handles.push(handle);
47        }
48
49        for handle in handles.into_iter() {
50            handle.join().unwrap()
51        }
52    });
53
54    let elapsed = before.elapsed();
55
56    let per_second = N as u128 * 1000 / elapsed.as_millis();
57
58    println!(
59        "with {} producers and {} consumers, took {:?} to transfer {} items ({} per second)",
60        PRODUCERS, CONSUMERS, elapsed, N, per_second
61    );
62}