1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#[cfg(test)]
mod ebr_model {
use loom::sync::atomic::fence;
use loom::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::Arc;
const INACTIVE: u8 = 1_u8 << 7;
struct ModelCollector {
state: AtomicU8,
epoch_witnessed: AtomicUsize,
num_collected: AtomicUsize,
collected: [AtomicUsize; 3],
}
impl ModelCollector {
fn new_barrier(&self, epoch: &AtomicU8, ptr: &ModelPointer) {
let global_epoch = epoch.load(Relaxed);
let known_epoch = self.state.load(Relaxed) & (!INACTIVE);
self.state.store(global_epoch, Relaxed);
fence(SeqCst);
if global_epoch != known_epoch {
self.epoch_updated(ptr);
}
}
fn epoch_updated(&self, ptr: &ModelPointer) {
self.epoch_witnessed.fetch_add(1, Relaxed);
if self.collected[self.epoch_witnessed.load(Relaxed) % 3].swap(0, Relaxed) > 0 {
assert!(ptr.unreachable.load(Relaxed));
assert!(!ptr.reclaimed.swap(true, Relaxed));
}
}
fn collect(&self) {
self.collected[self.epoch_witnessed.load(Relaxed) % 3].fetch_add(1, Relaxed);
self.num_collected.fetch_add(1, Relaxed);
}
fn end_barrier(&self, epoch: &AtomicU8, ptr: &ModelPointer, other: &ModelCollector) {
let mut known_epoch = self.state.load(Relaxed);
let other_epoch = other.state.load(Relaxed);
if (other_epoch & INACTIVE) == INACTIVE || other_epoch == known_epoch {
let new = match known_epoch {
0 => 1,
1 => 2,
_ => 0,
};
fence(SeqCst);
epoch.store(new, Relaxed);
self.state.store(new, Relaxed);
known_epoch = new;
self.epoch_updated(ptr);
}
self.state.store(known_epoch | INACTIVE, Relaxed);
}
}
impl Default for ModelCollector {
fn default() -> Self {
ModelCollector {
state: AtomicU8::new(1),
epoch_witnessed: AtomicUsize::new(0),
num_collected: AtomicUsize::new(0),
collected: Default::default(),
}
}
}
#[derive(Default)]
struct ModelPointer {
unreachable: AtomicBool,
reclaimed: AtomicBool,
}
#[test]
#[ignore]
fn ebr() {
let mut model = loom::model::Builder::new();
let reclaimed = Arc::new(AtomicUsize::new(0));
let reclaimed_cloned = reclaimed.clone();
model.max_threads = 2;
model.check(move || {
let epoch: Arc<AtomicU8> = Arc::default();
let collectors: Arc<(ModelCollector, ModelCollector)> = Arc::default();
let ptr: Arc<ModelPointer> = Arc::default();
let thread = {
let (epoch, collectors, ptr) = (epoch.clone(), collectors.clone(), ptr.clone());
loom::thread::spawn(move || {
let epoch_ref = epoch.as_ref();
let collector_ref = &collectors.0;
let ptr_ref = ptr.as_ref();
collector_ref.new_barrier(epoch_ref, ptr_ref);
assert!(!ptr_ref.unreachable.swap(true, Relaxed));
collector_ref.collect();
collector_ref.end_barrier(epoch_ref, ptr_ref, &collectors.1);
(0..3).for_each(|_| {
collector_ref.new_barrier(epoch_ref, ptr_ref);
collector_ref.end_barrier(epoch_ref, ptr_ref, &collectors.1);
});
})
};
let epoch_ref = epoch.as_ref();
let collector_ref = &collectors.1;
let ptr_ref = ptr.as_ref();
collector_ref.new_barrier(epoch_ref, ptr_ref);
assert!(ptr_ref.unreachable.load(Relaxed) || !ptr_ref.reclaimed.load(Relaxed));
collector_ref.end_barrier(epoch_ref, ptr_ref, &collectors.0);
if ptr_ref.reclaimed.load(Relaxed) {
reclaimed_cloned.fetch_add(1, Relaxed);
}
drop(thread.join());
});
assert!(reclaimed.load(Relaxed) > 0);
}
}