1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
//! Simpler version of the left-right from Jon Gjengset library.
//!
//! Uses two copies of the value to allow doing small changes, while still allowing non-blocking reading.
//! Writing can block, while reading doesn't.
#![warn(
clippy::cargo,
clippy::all,
clippy::perf,
clippy::style,
clippy::complexity,
clippy::suspicious,
clippy::correctness,
missing_docs,
missing_copy_implementations,
missing_debug_implementations,
clippy::absolute_paths
)]
#![deny(
unsafe_op_in_unsafe_fn,
clippy::missing_safety_doc,
clippy::undocumented_unsafe_blocks
)]
use std::{
cell::UnsafeCell,
collections::VecDeque,
hint::assert_unchecked,
marker::PhantomData,
mem::MaybeUninit,
ops::Deref,
sync::{
atomic::{fence, AtomicU8, Ordering},
Arc,
},
};
mod inner;
use inner::{Ptr, ReadState, Shared};
/// Should be implemented on structs that want to be shared with this library
pub trait Absorb<O> {
/// has to be deterministic. Operations will be applied in the same order to both buffers
fn absorb(&mut self, operation: O);
}
/// Data won't change while holding the Guard. This also means the Writer can only issue one swap, while Guard is being held
#[derive(Debug)]
pub struct ReadGuard<'a, T> {
data: &'a T,
state: &'a AtomicU8,
// PhantomData makes the borrow checker prove that there only ever is one ReadGuard
//
// This is needed because setting the ReadState can only be reset when no ReadGuard exists
// and that would mean some kind of counter
reader: PhantomData<&'a mut Reader<T>>,
}
// only struct that should have this impl, as it doesn't have any methods
impl<'a, T> Deref for ReadGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.data
}
}
impl<T, E> AsRef<E> for ReadGuard<'_, T>
where
E: ?Sized,
T: AsRef<E>,
{
fn as_ref(&self) -> &E {
self.deref().as_ref()
}
}
impl<T> Drop for ReadGuard<'_, T> {
fn drop(&mut self) {
// release the read lock
self.state.fetch_and(0b100, Ordering::Release);
}
}
/// Dropping the Reader isn't realtime safe, because if dropped after the Writer, it deallocates.
/// Should only get dropped, when closing the real-time thread
///
/// Reader will be able to read data even if Writer has been dropped. Obviously that data won't change anymore
/// When there is no Reader the Writer is able to create a new one. The other way around doesn't work
#[derive(Debug)]
pub struct Reader<T> {
inner: Arc<Shared<T>>,
}
impl<T> Reader<T> {
/// this function never blocks. (`fetch_update` loop doesn't count)
pub fn lock(&mut self) -> ReadGuard<'_, T> {
// sets the corresponding read bit to the write ptr bit
// happens as a single atomic operation so the 'double read' state isn't needed
// ptr bit doesnt get changed
let update_result =
self.inner
.state
.fetch_update(Ordering::Relaxed, Ordering::Acquire, |value| {
// SAFETY: At this point no Read bit is set, as creating a ReadGuard requires a &mut Reader and the Guard holds the &mut Reader
unsafe {
assert_unchecked(value & 0b011 == 0);
}
match value.into() {
Ptr::Value1 => Some(0b001),
Ptr::Value2 => Some(0b110),
}
});
// SAFETY: the passed clorusure always returns Some, so fetch_update never returns Err
let ptr = unsafe { update_result.unwrap_unchecked().into() };
// SAFETY: the Writer always sets the Read bit to the opposite of its write_ptr
let data = unsafe { self.inner.get_value(ptr).get().as_ref().unwrap_unchecked() };
// SAFETY: the read_state is set to the value that is being
ReadGuard {
data,
state: &self.inner.state,
reader: PhantomData,
}
}
}
// Don't ever create a WriteGuard directly
/// Can be used to write to the Data structure.
///
/// When this structure exists the Reader already switched to the other value
///
/// Dropping this makes all changes available to the Reader
#[derive(Debug)]
pub struct WriteGuard<'a, T, O> {
writer: &'a mut Writer<T, O>,
}
impl<T, O> WriteGuard<'_, T, O> {
/// Makes the changes available to the reader.
pub fn swap(self) {}
/// Gets the value currently being written to.
#[must_use]
pub fn read(&self) -> &T {
self.writer.read()
}
/// Isn't public as this could easily create disconnects between the two versions.
/// While that wouldn't lead to UB it goes against the purpose of this library
fn get_data_mut(&mut self) -> &mut T {
// SAFETY: When creating the writeguad it is checked that the reader doesnt have access to the same data
// This function requires &mut self so there also isn't any ref created by writeguard.
unsafe { self.get_data_ptr().as_mut().unwrap() }
}
fn get_data_ptr(&self) -> *mut T {
self.writer.shared.get_value(self.writer.write_ptr).get()
}
}
impl<'a, T: Absorb<O>, O> WriteGuard<'a, T, O> {
/// created a new `WriteGuard` and syncs the two values if needed.
///
/// ### SAFETY
/// No `ReadGuard` is allowed to exist to the same value the `Writer.write_ptr` points to
///
/// Assuming a correct `Reader` & `ReadGuard` implementation:
/// If Inner.read_state.can_write(Writer.write_ptr) == true this function is fine to call
unsafe fn new(writer: &'a mut Writer<T, O>) -> Self {
let mut guard = Self { writer };
while let Some(operation) = guard.writer.op_buffer.pop_front() {
guard.get_data_mut().absorb(operation);
}
guard
}
}
impl<T: Absorb<O>, O: Clone> WriteGuard<'_, T, O> {
/// applies operation to the current write Value and stores it to apply to the other later.
/// If there is no reader the operation is applied to both values immediately and not stored.
pub fn apply_op(&mut self, operation: O) {
if let Some(inner) = Arc::get_mut(&mut self.writer.shared) {
inner.value_1.get_mut().absorb(operation.clone());
inner.value_2.get_mut().absorb(operation);
} else {
self.writer.op_buffer.push_back(operation.clone());
self.get_data_mut().absorb(operation);
}
}
}
impl<T, O> Drop for WriteGuard<'_, T, O> {
fn drop(&mut self) {
self.writer.swap();
}
}
/// Not realtime safe Object which can change the internal T value
#[derive(Debug)]
pub struct Writer<T, O> {
shared: Arc<Shared<T>>,
// sets which buffer the next write is applied to
// write_ptr doesn't need to be Atomics as it only changes, when the Writer itself swaps
write_ptr: Ptr,
// buffer is pushed at the back and popped at the front.
op_buffer: VecDeque<O>,
}
impl<T, O> Writer<T, O> {
/// swaps the read and write values. If no changes were made since the last swap nothing happens. Never blocks
/// not public as swapping without creating a `WriteGuard` is pretty
fn swap(&mut self) {
if self.op_buffer.is_empty() {
return;
}
match self.write_ptr {
Ptr::Value1 => self.shared.state.fetch_and(0b011, Ordering::Release),
Ptr::Value2 => self.shared.state.fetch_or(0b100, Ordering::Release),
};
self.write_ptr.switch();
}
/// get a Reader if none exists
pub fn build_reader(&mut self) -> Option<Reader<T>> {
if Arc::get_mut(&mut self.shared).is_some() {
Some(Reader {
inner: self.shared.clone(),
})
} else {
None
}
}
/// The Value returned may be newer than the version the reader is currently seeing.
/// This value will be written to next.
#[must_use]
pub fn read(&self) -> &T {
// SAFETY: Only the WriteGuard can write to the values / create mut refs to them.
// The WriteGuard holds a mut ref to the writer so this function can't be called while a writeguard exists
// This means that reading them / creating refs is safe to do
unsafe {
self.shared
.get_value(self.write_ptr)
.get()
.as_ref()
.unwrap_unchecked()
}
}
}
impl<T: Absorb<O>, O> Writer<T, O> {
/// Blocks if the Reader has a `ReadGuard` pointing to the old value.
///
/// Uses a Spinlock because for anything else the OS needs to be involved. Reader can't talk to the OS.
pub fn lock(&mut self) -> WriteGuard<'_, T, O> {
let backoff = crossbeam_utils::Backoff::new();
loop {
// operation has to be aquire, but only the time it breaks the loop
let state = self.shared.state.load(Ordering::Relaxed);
if ReadState::from(state).can_write(self.write_ptr) {
// make the load operation aquire only when it actually breaks the loop
// the important (last) load is aquire, while all loads before are relaxed
fence(Ordering::Acquire);
break;
}
backoff.snooze();
}
// SAFETY: The spinloop before is only exited once the ReadState allows writing to the current
// write_ptr value.
unsafe { WriteGuard::new(self) }
}
/// doesn't block. Returns None if the Reader has a `ReadGuard` pointing to the old value
pub fn try_lock(&mut self) -> Option<WriteGuard<'_, T, O>> {
let state = self.shared.state.load(Ordering::Acquire);
if ReadState::from(state).can_write(self.write_ptr) {
// SAFETY: ReadState allows this
unsafe { Some(WriteGuard::new(self)) }
} else {
None
}
}
}
impl<T: Clone, O> Writer<T, O> {
/// Creates a new Writer by cloning the value once to get two values
pub fn new(value: T) -> Self {
let mut shared: Arc<MaybeUninit<Shared<T>>> = Arc::new_uninit();
// SAFETY: Arc was just created
let shared_ptr = unsafe { Arc::get_mut(&mut shared).unwrap_unchecked() }.as_mut_ptr();
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let state_ptr: *mut AtomicU8 = unsafe { &raw mut (*shared_ptr).state };
// SAFETY: Ptr is valid, Arc allocated it
unsafe { state_ptr.write(AtomicU8::new(0b000)) };
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let value_1_ptr: *mut UnsafeCell<T> = unsafe { &raw mut (*shared_ptr).value_1 };
// SAFETY: UnsafeCell<T> has the same memory Layout as T
unsafe { value_1_ptr.cast::<T>().write(value.clone()) };
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let value_2_ptr: *mut UnsafeCell<T> = unsafe { &raw mut (*shared_ptr).value_2 };
// SAFETY: UnsafeCell<T> has the same memory Layout as T
unsafe { value_2_ptr.cast::<T>().write(value) };
// SAFETY: all fields of shared were initialized
let shared: Arc<Shared<T>> = unsafe { shared.assume_init() };
Writer {
shared,
write_ptr: Ptr::Value2,
op_buffer: VecDeque::new(),
}
}
}
impl<T: Default, O> Default for Writer<T, O> {
/// Creates a new Writer by calling `T::default()` twice to create the two values
///
/// Default impl of T needs to give the same result every time. Doesn't lead to UB, but turns the library basically useless
fn default() -> Self {
let mut shared: Arc<MaybeUninit<Shared<T>>> = Arc::new_uninit();
// SAFETY: Arc was just created
let shared_ptr = unsafe { Arc::get_mut(&mut shared).unwrap_unchecked() }.as_mut_ptr();
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let state_ptr: *mut AtomicU8 = unsafe { &raw mut (*shared_ptr).state };
// SAFETY: Ptr is valid, Arc allocated it
unsafe { state_ptr.write(AtomicU8::new(0b000)) };
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let value_1_ptr: *mut UnsafeCell<T> = unsafe { &raw mut (*shared_ptr).value_1 };
// SAFETY: UnsafeCell<T> has the same memory Layout as T
unsafe { value_1_ptr.cast::<T>().write(T::default()) };
// SAFETY: doesn't really deref the ptr, uses raw ref to get another pointer
let value_2_ptr: *mut UnsafeCell<T> = unsafe { &raw mut (*shared_ptr).value_2 };
// SAFETY: UnsafeCell<T> has the same memory Layout as T
unsafe { value_2_ptr.cast::<T>().write(T::default()) };
// SAFETY: all fields of shared were initialized
let shared: Arc<Shared<T>> = unsafe { shared.assume_init() };
Writer {
shared,
write_ptr: Ptr::Value2,
op_buffer: VecDeque::new(),
}
}
}