use super::semaphore::Sem;
use atomic::Ordering;
use std::sync::atomic::AtomicUsize;
pub struct SingleWriterSynchronizer {
enter: AtomicUsize,
exit: [AtomicUsize; 2],
waiting_for: AtomicUsize,
wakeup: Sem,
}
impl SingleWriterSynchronizer {
#[inline]
pub fn enter(&self) -> usize {
self.enter.fetch_add(2, Ordering::Relaxed) + 2
}
#[inline]
pub fn exit(&self, enter_value: usize) {
let exit_value = self.exit[enter_value & 1].fetch_add(2, Ordering::Relaxed) + 2;
if exit_value == self.waiting_for.load(Ordering::Relaxed) {
self.wakeup.signal(1);
}
}
pub fn synchronize(&self) {
atomic::fence(Ordering::SeqCst);
let mut value = self.enter.load(Ordering::Relaxed);
let new_ptr = &self.exit[(value + 1) & 1];
let mut old;
loop {
old = value;
value += 1;
new_ptr.store(value, Ordering::Relaxed);
match self
.enter
.compare_exchange_weak(old, value, Ordering::Relaxed, Ordering::Relaxed)
{
Ok(_) => break,
Err(v) => value = v,
}
}
let old_ptr = &self.exit[old & 1];
self.waiting_for.store(old, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
while old != old_ptr.load(Ordering::Acquire) {
self.wakeup.wait();
}
while self.wakeup.try_wait() {}
}
pub fn new() -> Self {
Self {
enter: AtomicUsize::new(0),
exit: [AtomicUsize::new(0), AtomicUsize::new(0)],
waiting_for: AtomicUsize::new(1),
wakeup: Sem::new(0).unwrap(),
}
}
}
pub struct CriticalSection<'a> {
synchronizer: &'a SingleWriterSynchronizer,
enter_value: usize
}
impl<'a> CriticalSection<'a> {
pub fn new(synchronizer: &'a SingleWriterSynchronizer) -> Self {
Self {
synchronizer,
enter_value: synchronizer.enter(),
}
}
}
impl<'a> Drop for CriticalSection<'a> {
fn drop(&mut self) {
self.synchronizer.exit(self.enter_value);
}
}
#[cfg(test)]
mod tests {
use std::{sync::{Arc, atomic::AtomicI32}, time::Duration};
use super::*;
const NREADERS: usize = 5;
#[test]
fn test_single_writer_synchronizer() {
let synchronizer = SingleWriterSynchronizer::new();
let synchronized_value = AtomicUsize::new(0);
let continue_running = AtomicI32::new(1);
std::thread::scope(|scope| {
let readers = (0..NREADERS).map(|_| {
scope.spawn(|| {
let mut iterations = 0;
let mut values_changed = 0;
while continue_running.load(Ordering::Acquire) != 0 {
iterations += 1;
let cs = CriticalSection::new(&synchronizer);
let value = synchronized_value.load(Ordering::Acquire);
let mut new_value = value;
for _ in 0..10 {
new_value = synchronized_value.load(Ordering::Acquire);
if value != new_value {
assert_eq!(value + 1, new_value);
}
}
if value != new_value {
values_changed += 1;
}
drop(cs);
}
println!("reader iterations: {}, changes: {}", iterations, values_changed);
})
}).collect::<Vec<_>>();
let writer = scope.spawn(|| {
while continue_running.load(Ordering::Acquire) != 0 {
synchronized_value.fetch_add(1, Ordering::Relaxed);
synchronizer.synchronize();
}
println!("writer iterations: {}", synchronized_value.load(Ordering::Acquire));
});
std::thread::sleep(Duration::from_millis(1000));
continue_running.store(0, Ordering::Release);
for reader in readers {
reader.join().unwrap();
}
writer.join().unwrap();
});
}
}