1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111

///
/// The atomic monitor concurrency utility, created by Phoenix Kahlo.
///

pub extern crate atomic;
pub extern crate time;

extern crate monitor;

pub use atomic::Ordering;

use atomic::Atomic;
use monitor::Monitor;
use time::{precise_time_ns, Duration};

pub struct AtomMonitor<T: Copy> {
    data: Atomic<T>,
    requesting: Atomic<u32>,
    monitor: Monitor<()>
}
impl<T: Copy> AtomMonitor<T> {
    pub fn new(value: T) -> Self {
        AtomMonitor {
            data: Atomic::new(value),
            requesting: Atomic::new(0),
            monitor: Monitor::new(())
        }
    }

    pub fn mutate<O>(&self, mut mutator: impl FnMut(&Atomic<T>) -> O) -> O {
        let out = mutator(&self.data);
        let requesting = self.requesting.load(Ordering::Acquire);
        if requesting > 0 {
            self.monitor.with_lock(|guard| guard.notify_all());
        } else {
        }
        out
    }

    pub fn get(&self) -> T {
        self.data.load(Ordering::Acquire)
    }

    pub fn set(&self, value: T) {
        self.mutate(|atomic| atomic.store(value, Ordering::Release));
    }

    pub fn wait_until(&self, mut condition: impl FnMut(T) -> bool) -> T {
        let mut value = self.get();
        if !condition(value) {
            self.requesting.fetch_add(1, Ordering::SeqCst);
            value = self.get();
            if !condition(value) {
                self.monitor.with_lock(|mut guard| {
                    while {
                        value = self.get();
                        !condition(value)
                    } {
                        guard.wait();
                    }
                });
            }
            self.requesting.fetch_sub(1, Ordering::SeqCst);
        }
        value
    }

    pub fn wait_until_timeout(&self, mut condition: impl FnMut(T) -> bool, timeout: Duration) -> Option<T> {
        let end_time = precise_time_ns() + timeout.num_nanoseconds().unwrap() as u64;

        let mut value = self.get();
        if !condition(value) {
            self.requesting.fetch_add(1, Ordering::SeqCst);
            let satisfied =
                if {
                    value = self.get();
                    condition(value)
                } {
                    true
                } else {
                    self.monitor.with_lock(|mut guard| loop {
                        if {
                            value = self.get();
                            condition(value)
                        } {
                            break true;
                        } else if precise_time_ns() > end_time {
                            break false;
                        } else {
                            if let Ok(remaining) = Duration::nanoseconds(end_time as i64 - precise_time_ns() as i64).to_std() {
                                guard.wait_timeout(remaining);
                            }
                        }
                    })
                };
            self.requesting.fetch_sub(1, Ordering::SeqCst);
            if satisfied {
                Some(value)
            } else {
                None
            }
        } else {
            Some(value)
        }
    }

    pub fn notify_all(&self) {
        self.monitor.with_lock(|guard| guard.notify_all());
    }
}