1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
pub extern crate atomic;
pub extern crate time;
extern crate monitor;
pub use atomic::Ordering;
use atomic::Atomic;
use monitor::Monitor;
use time::{precise_time_ns, Duration};
pub struct AtomMonitor<T: Copy> {
data: Atomic<T>,
requesting: Atomic<u32>,
monitor: Monitor<()>
}
impl<T: Copy> AtomMonitor<T> {
pub fn new(value: T) -> Self {
AtomMonitor {
data: Atomic::new(value),
requesting: Atomic::new(0),
monitor: Monitor::new(())
}
}
pub fn mutate<O>(&self, mut mutator: impl FnMut(&Atomic<T>) -> O) -> O {
let out = mutator(&self.data);
let requesting = self.requesting.load(Ordering::Acquire);
if requesting > 0 {
self.monitor.with_lock(|guard| guard.notify_all());
} else {
}
out
}
pub fn get(&self) -> T {
self.data.load(Ordering::Acquire)
}
pub fn set(&self, value: T) {
self.mutate(|atomic| atomic.store(value, Ordering::Release));
}
pub fn wait_until(&self, mut condition: impl FnMut(T) -> bool) -> T {
let mut value = self.get();
if !condition(value) {
self.requesting.fetch_add(1, Ordering::SeqCst);
value = self.get();
if !condition(value) {
self.monitor.with_lock(|mut guard| {
while {
value = self.get();
!condition(value)
} {
guard.wait();
}
});
}
self.requesting.fetch_sub(1, Ordering::SeqCst);
}
value
}
pub fn wait_until_timeout(&self, mut condition: impl FnMut(T) -> bool, timeout: Duration) -> Option<T> {
let end_time = precise_time_ns() as i128 + timeout.num_nanoseconds().unwrap() as i128;
let mut value = self.get();
if !condition(value) {
self.requesting.fetch_add(1, Ordering::SeqCst);
let satisfied =
if {
value = self.get();
condition(value)
} {
true
} else {
self.monitor.with_lock(|mut guard| loop {
if {
value = self.get();
condition(value)
} {
break true;
} else if precise_time_ns() as i128 > end_time {
break false;
} else {
if let Ok(remaining) = Duration::nanoseconds(end_time as i64 - precise_time_ns() as i64).to_std() {
guard.wait_timeout(remaining);
}
}
})
};
self.requesting.fetch_sub(1, Ordering::SeqCst);
if satisfied {
Some(value)
} else {
None
}
} else {
Some(value)
}
}
pub fn notify_all(&self) {
self.monitor.with_lock(|guard| guard.notify_all());
}
}