use std::cell::UnsafeCell;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use core::fmt;
use parking_lot::Mutex;
struct LeftRightShared<T> {
live_idx: AtomicUsize,
data: [UnsafeCell<T>; 2],
active_readers: [AtomicUsize; 2],
}
impl<T> fmt::Debug for LeftRightShared<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LeftRightShared")
.field("live_idx", &self.live_idx.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
unsafe impl<T: Send> Send for LeftRightShared<T> {}
unsafe impl<T: Send + Sync> Sync for LeftRightShared<T> {}
#[derive(Debug)]
pub(crate) struct ReadHandle<T> {
shared: Arc<LeftRightShared<T>>,
}
#[derive(Debug)]
pub(crate) struct WriteHandle<T> {
shared: Arc<LeftRightShared<T>>,
writer_lock: Mutex<()>,
}
pub(crate) struct ReadGuard<T> {
shared: Arc<LeftRightShared<T>>,
idx: usize,
data: *const T,
}
impl<T> fmt::Debug for ReadGuard<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadGuard")
.field("idx", &self.idx)
.finish_non_exhaustive()
}
}
unsafe impl<T: Send + Sync> Send for ReadGuard<T> {}
unsafe impl<T: Sync> Sync for ReadGuard<T> {}
pub(crate) fn new<T: Clone + Default>() -> (ReadHandle<T>, WriteHandle<T>) {
let shared = Arc::new(LeftRightShared {
live_idx: AtomicUsize::new(0),
data: [UnsafeCell::new(T::default()), UnsafeCell::new(T::default())],
active_readers: [AtomicUsize::new(0), AtomicUsize::new(0)],
});
let rh = ReadHandle {
shared: shared.clone(),
};
let wh = WriteHandle {
shared,
writer_lock: Mutex::new(()),
};
(rh, wh)
}
impl<T> ReadHandle<T> {
pub(crate) fn enter(&self) -> ReadGuard<T> {
loop {
let idx = self.shared.live_idx.load(Ordering::Acquire);
self.shared.active_readers[idx].fetch_add(1, Ordering::SeqCst);
if self.shared.live_idx.load(Ordering::Relaxed) == idx {
let data = unsafe { &*self.shared.data[idx].get() } as *const T;
return ReadGuard {
shared: self.shared.clone(),
idx,
data,
};
}
self.shared.active_readers[idx].fetch_sub(1, Ordering::SeqCst);
}
}
}
impl<T> Clone for ReadHandle<T> {
fn clone(&self) -> Self {
Self {
shared: self.shared.clone(),
}
}
}
impl<T> Deref for ReadGuard<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.data }
}
}
impl<T> Drop for ReadGuard<T> {
fn drop(&mut self) {
self.shared.active_readers[self.idx].fetch_sub(1, Ordering::SeqCst);
}
}
impl<T: Clone> WriteHandle<T> {
pub(crate) fn modify<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
let _lock = self.writer_lock.lock();
let live_idx = self.shared.live_idx.load(Ordering::Relaxed);
let write_idx = 1 - live_idx;
while self.shared.active_readers[write_idx].load(Ordering::Acquire) > 0 {
std::hint::spin_loop();
}
let live_data = unsafe { &*self.shared.data[live_idx].get() };
let write_data = unsafe { &mut *self.shared.data[write_idx].get() };
write_data.clone_from(live_data);
f(write_data);
self.shared.live_idx.store(write_idx, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn initial_state_is_default() {
let (rh, _wh) = new::<i32>();
let guard = rh.enter();
assert_eq!(*guard, 0); }
#[test]
fn write_and_read_back() {
let (rh, wh) = new::<String>();
assert_eq!(*rh.enter(), "");
wh.modify(|s| *s = "hello".to_string());
assert_eq!(*rh.enter(), "hello");
wh.modify(|s| s.push_str(" world"));
assert_eq!(*rh.enter(), "hello world");
}
#[test]
fn cloned_read_handle_sees_updates() {
let (rh, wh) = new::<usize>();
let rh_clone = rh.clone();
assert_eq!(*rh.enter(), 0);
assert_eq!(*rh_clone.enter(), 0);
wh.modify(|val| *val = 100);
assert_eq!(*rh.enter(), 100);
assert_eq!(*rh_clone.enter(), 100);
}
#[derive(Clone, Default, Debug, PartialEq, Eq)]
struct TestData {
version: usize,
data: [usize; 8],
}
impl TestData {
fn is_consistent(&self) -> bool {
self.data.iter().all(|&x| x == self.version)
}
}
#[test]
fn concurrent_reads_and_writes_are_consistent() {
let (rh, wh) = new::<TestData>();
let rh = Arc::new(rh);
let wh = Arc::new(wh);
let writer_finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut reader_handles = Vec::new();
for _ in 0..4 {
let rh_clone = rh.clone();
let writer_finished_clone = writer_finished.clone();
reader_handles.push(thread::spawn(move || {
let mut last_seen_version = 0;
while !writer_finished_clone.load(Ordering::Acquire) {
let guard = rh_clone.enter();
assert!(
guard.is_consistent(),
"Inconsistent read detected: {:?}",
*guard
);
assert!(
guard.version >= last_seen_version,
"Version went backwards!"
);
last_seen_version = guard.version;
thread::yield_now();
}
}));
}
let writer_handle = thread::spawn(move || {
for i in 1..=100 {
wh.modify(|d| {
d.version = i;
d.data.iter_mut().for_each(|item| *item = i);
});
thread::sleep(Duration::from_micros(10)); }
writer_finished.store(true, Ordering::Release);
});
writer_handle.join().expect("Writer thread panicked");
for handle in reader_handles {
handle.join().expect("Reader thread panicked");
}
let final_guard = rh.enter();
assert_eq!(final_guard.version, 100);
assert!(final_guard.is_consistent());
}
}