1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
//! Support module for `WriterReaderPhaser` and related structs.
//!
//! See the documentation of the [`WriterReaderPhaser`] struct for more information.
//!
//! [`WriterReaderPhaser`]: struct.WriterReaderPhaser.html

use std::sync::{Arc, Mutex, LockResult, MutexGuard};
use std::sync::atomic::{AtomicIsize, Ordering};
use std::isize::MIN as ISIZE_MIN;
use std::time::Duration;
use std::thread;

/// A synchronization primitive that allows for multiple concurrent wait-free "writer critical
/// sections" and a "reader phase flip" that can wait for all currently-active writers to finish.
///
/// The basic interaction setup for a `WriterReaderPhaser` is as follows:
///
/// * Any number of writers can open and close a "writer critical section" with no waiting.
/// * Zero or one readers can be active at one time, by holding a "read lock". Any reader who
///   wishes to open a "read lock" while another one is active is blocked until the previous one
///   finishes.
/// * The holder of a read lock may request a "phase flip", which causes the reader to wait until
///   all current writer critical sections are finished before continuing.
///
/// `WriterReaderPhaser` is a port of the primitive of the same name from `HdrHistogram`. For a
/// summary of the rationale behind its design, see [this post by its author][wrp-blog]. Part of
/// its assumptions is that this primitive is synchronizing access to a double-buffered set of
/// counters, and the readers are expected to swap the buffers while holding a read lock but before
/// flipping the phase. This allows them to access a stable sample to read and perform calculations
/// from, while writers still have wait-free synchronization.
///
/// [wrp-blog]: https://stuff-gil-says.blogspot.com/2014/11/writerreaderphaser-story-about-new.html
///
/// "Writer critical sections" and "read locks" are represented by guard structs that allow
/// scope-based resource management of the counters and locks.
///
/// * The `PhaserCriticalSection` atomically increments and decrements the phase counters upon
///   creation and drop. These operations use `std::sync::atomic::AtomicIsize` from the standard
///   library, and provide no-wait handling for platforms with atomic addition instructions.
/// * The `PhaserReadLock` is kept in the `WriterReaderPhaser` as a Mutex, enforcing the mutual
///   exclusion of the read lock. The "phase flip" operation is defined on the read lock guard
///   itself, enforcing that only the holder of a read lock can execute one.
pub struct WriterReaderPhaser {
    start_epoch: Arc<AtomicIsize>,
    even_end_epoch: Arc<AtomicIsize>,
    odd_end_epoch: Arc<AtomicIsize>,
    read_lock: Mutex<PhaserReadLock>,
}

/// Guard struct that represents a "writer critical section" for a `WriterReaderPhaser`.
///
/// `PhaserCriticalSection` is a scope-based guard to signal the beginning and end of a "writer
/// critical section" to the phaser. Upon calling `writer_critical_section`, the phaser atomically
/// increments a counter, and when the returned `PhaserCriticalSection` drops, the `drop` call
/// atomically increments another counter. On platforms with atomic increment instructions, this
/// should result in wait-free synchronization.
///
/// # Example
///
/// ```
/// # let phaser = synchronoise::WriterReaderPhaser::new();
/// {
///     let _guard = phaser.writer_critical_section();
///     // perform writes
/// } // _guard drops, signaling the end of the section
/// ```
pub struct PhaserCriticalSection {
    end_epoch: Arc<AtomicIsize>,
}

/// Upon drop, a `PhaserCriticalSection` will signal its parent `WriterReaderPhaser` that the
/// critical section has ended.
impl Drop for PhaserCriticalSection {
    fn drop(&mut self) {
        self.end_epoch.fetch_add(1, Ordering::Release);
    }
}

/// Guard struct for a `WriterReaderPhaser` that allows a reader to perform a "phase flip".
///
/// The `PhaserReadLock` struct allows one to perform a "phase flip" on its parent
/// `WriterReaderPhaser`. It is held in a `std::sync::Mutex` in its parent phaser, enforcing that
/// only one reader may be active at once.
///
/// The `flip_phase` call performs a spin-wait while waiting the the currently-active writers to
/// finish. A sleep time may be added between checks by calling `flip_with_sleep` instead.
///
/// # Example
///
/// ```
/// # let phaser = synchronoise::WriterReaderPhaser::new();
/// {
///     let lock = phaser.read_lock().unwrap();
///     // swap buffers
///     lock.flip_phase();
///     // reader now has access to a stable snapshot
/// } // lock drops, relinquishing the read lock and allowing another reader to lock
/// ```
pub struct PhaserReadLock {
    start_epoch: Arc<AtomicIsize>,
    even_end_epoch: Arc<AtomicIsize>,
    odd_end_epoch: Arc<AtomicIsize>,
}

impl WriterReaderPhaser {
    /// Creates a new `WriterReaderPhaser`.
    pub fn new() -> WriterReaderPhaser {
        let start = Arc::new(AtomicIsize::new(0));
        let even = Arc::new(AtomicIsize::new(0));
        let odd = Arc::new(AtomicIsize::new(ISIZE_MIN));
        let read_lock = PhaserReadLock {
            start_epoch: start.clone(),
            even_end_epoch: even.clone(),
            odd_end_epoch: odd.clone(),
        };

        WriterReaderPhaser {
            start_epoch: start,
            even_end_epoch: even,
            odd_end_epoch: odd,
            read_lock: Mutex::new(read_lock),
        }
    }

    /// Enters a writer critical section, returning a guard object that signals the end of the
    /// critical section upon drop.
    pub fn writer_critical_section(&self) -> PhaserCriticalSection {
        let flag = self.start_epoch.fetch_add(1, Ordering::Release);

        if flag < 0 {
            PhaserCriticalSection {
                end_epoch: self.odd_end_epoch.clone(),
            }
        } else {
            PhaserCriticalSection {
                end_epoch: self.even_end_epoch.clone(),
            }
        }
    }

    /// Enter a reader criticial section, potentially blocking until a currently active read
    /// section finishes. Returns a guard object that allows the user to flip the phase of the
    /// `WriterReaderPhaser`, and unlocks the read lock upon drop.
    ///
    /// # Errors
    ///
    /// If another reader critical section panicked while holding the read lock, this call will
    /// return an error once the lock is acquired. See the documentation for
    /// `std::sync::Mutex::lock` for details.
    pub fn read_lock(&self) -> LockResult<MutexGuard<PhaserReadLock>> {
        self.read_lock.lock()
    }
}

impl PhaserReadLock {
    /// Wait until all currently-active writer critical sections have completed.
    pub fn flip_phase(&self) {
        self.flip_with_sleep(Duration::default());
    }

    /// Wait until all currently-active writer critical sections have completed. While waiting,
    /// sleep with the given duration.
    pub fn flip_with_sleep(&self, sleep_time: Duration) {
        let next_phase_even = self.start_epoch.load(Ordering::Relaxed) < 0;

        let start_value = if next_phase_even {
            let tmp = 0;
            self.even_end_epoch.store(tmp, Ordering::Relaxed);
            tmp
        } else {
            let tmp = ISIZE_MIN;
            self.odd_end_epoch.store(tmp, Ordering::Relaxed);
            tmp
        };

        let value_at_flip = self.start_epoch.swap(start_value, Ordering::AcqRel);

        let end_epoch = if next_phase_even {
            self.odd_end_epoch.clone()
        } else {
            self.even_end_epoch.clone()
        };

        while end_epoch.load(Ordering::Relaxed) != value_at_flip {
            if sleep_time == Duration::default() {
                thread::yield_now();
            } else {
                thread::sleep(sleep_time);
            }
        }
    }
}