1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

///
/// The atomic monitor concurrency utility, created by Phoenix Kahlo.
///

pub extern crate atomic;
pub extern crate time;

extern crate monitor;

pub use atomic::Ordering;

use atomic::Atomic;
use monitor::Monitor;
use time::{precise_time_ns, Duration};

pub struct AtomMonitor<T: Copy> {
    data: Atomic<T>,
    requesting: Atomic<u32>,
    monitor: Monitor<()>
}
impl<T: Copy> AtomMonitor<T> {
    pub fn new(value: T) -> Self {
        AtomMonitor {
            data: Atomic::new(value),
            requesting: Atomic::new(0),
            monitor: Monitor::new(())
        }
    }

    pub fn mutate<O>(&self, mut mutator: impl FnMut(&Atomic<T>) -> O) -> O {
        let out = mutator(&self.data);
        let requesting = self.requesting.load(Ordering::Acquire);
        if requesting > 0 {
            self.monitor.with_lock(|guard| guard.notify_all());
        } else {
        }
        out
    }

    pub fn get(&self) -> T {
        self.data.load(Ordering::Acquire)
    }

    pub fn set(&self, value: T) {
        self.mutate(|atomic| atomic.store(value, Ordering::Release));
    }

    pub fn wait_until(&self, mut condition: impl FnMut(T) -> bool) -> T {
        let mut value = self.get();
        if !condition(value) {
            self.requesting.fetch_add(1, Ordering::SeqCst);
            value = self.get();
            if !condition(value) {
                self.monitor.with_lock(|mut guard| {
                    while {
                        value = self.get();
                        !condition(value)
                    } {
                        guard.wait();
                    }
                });
            }
            self.requesting.fetch_sub(1, Ordering::SeqCst);
        }
        value
    }

    pub fn wait_until_timeout(&self, mut condition: impl FnMut(T) -> bool, timeout: Duration) -> Option<T> {
        let end_time = precise_time_ns() as i128 + timeout.num_nanoseconds().unwrap() as i128;

        let mut value = self.get();
        if !condition(value) {
            self.requesting.fetch_add(1, Ordering::SeqCst);
            let satisfied =
                if {
                    value = self.get();
                    condition(value)
                } {
                    true
                } else {
                    self.monitor.with_lock(|mut guard| loop {
                        if {
                            value = self.get();
                            condition(value)
                        } {
                            break true;
                        } else if precise_time_ns() as i128 > end_time {
                            break false;
                        } else {
                            if let Ok(remaining) = Duration::nanoseconds(end_time as i64 - precise_time_ns() as i64).to_std() {
                                guard.wait_timeout(remaining);
                            }
                        }
                    })
                };
            self.requesting.fetch_sub(1, Ordering::SeqCst);
            if satisfied {
                Some(value)
            } else {
                None
            }
        } else {
            Some(value)
        }
    }

    pub fn notify_all(&self) {
        self.monitor.with_lock(|guard| guard.notify_all());
    }
}