1use {
2 crate::{
3 backoff::Backoff,
4 sync::{
5 atomic::{AtomicU8, Ordering},
6 Arc,
7 },
8 PhantomUnsync,
9 },
10 std::{cell::UnsafeCell, marker::PhantomData, mem::MaybeUninit},
11};
12
13pub struct LockingReader<T> {
15 shared: Arc<Shared<T>>,
16 _marker: PhantomUnsync,
17}
18
19pub struct RealtimeWriter<T> {
21 shared: Arc<Shared<T>>,
22 _marker: PhantomUnsync,
23}
24
25pub fn writable<T>(value: T) -> (RealtimeWriter<T>, LockingReader<T>)
28where
29 T: Send,
30{
31 let shared = Arc::new(Shared {
32 values: [
33 UnsafeCell::new(MaybeUninit::uninit()),
34 UnsafeCell::new(MaybeUninit::new(value)),
35 ],
36 control: AtomicU8::new(ControlBits::default().into()),
37 });
38
39 (
40 RealtimeWriter {
41 shared: Arc::clone(&shared),
42 _marker: PhantomData,
43 },
44 LockingReader {
45 shared,
46 _marker: PhantomData,
47 },
48 )
49}
50
51struct Shared<T> {
52 values: [UnsafeCell<MaybeUninit<T>>; 2],
53 control: AtomicU8,
54}
55
56impl<T> Shared<T> {
57 #[inline]
60 unsafe fn get(&self, index: usize) -> &MaybeUninit<T> {
61 debug_assert!(index < self.values.len());
62 &*self.values[index].get()
63 }
64
65 #[inline]
68 unsafe fn get_mut(&self, index: usize) -> &mut MaybeUninit<T> {
69 debug_assert!(index < self.values.len());
70 &mut *self.values[index].get()
71 }
72}
73
74impl<T> Drop for Shared<T> {
75 fn drop(&mut self) {
76 let control: ControlBits = self.control.load(Ordering::SeqCst).into();
77
78 for slot in 0..self.values.len() {
79 if control.is_slot_initialised(slot) {
80 unsafe {
83 self.get_mut(slot).assume_init_drop();
84 };
85 }
86 }
87 }
88}
89
90unsafe impl<T> Send for Shared<T> {}
91unsafe impl<T> Sync for Shared<T> {}
92
93#[repr(u8)]
94enum ControlBit {
95 WriteSlotIndex = 0b001,
96 IsWriterBusy = 0b010,
97 HasNewData = 0b100,
98 IsSlot0Initialised = 0b1000,
99}
100
101impl From<ControlBit> for u8 {
102 fn from(bit: ControlBit) -> u8 {
103 bit as u8
104 }
105}
106
107#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
108struct ControlBits(u8);
109
110impl ControlBits {
111 fn is_set(&self, bit: ControlBit) -> bool {
112 self.0 & (bit as u8) != 0
113 }
114 fn set(self, bit: ControlBit) -> Self {
115 (self.0 | (bit as u8)).into()
116 }
117 fn clear(self, bit: ControlBit) -> Self {
118 (self.0 & !(bit as u8)).into()
119 }
120 fn flip(self, bit: ControlBit) -> Self {
121 (self.0 ^ (bit as u8)).into()
122 }
123 fn bitwise_and(self, bit: ControlBit) -> Self {
124 (self.0 & (bit as u8)).into()
125 }
126 fn write_slot_index(self) -> usize {
127 self.bitwise_and(ControlBit::WriteSlotIndex).0 as usize
128 }
129 fn read_slot_index(self) -> usize {
130 self.bitwise_and(ControlBit::WriteSlotIndex)
131 .flip(ControlBit::WriteSlotIndex)
132 .0 as usize
133 }
134 fn is_slot_initialised(self, slot: usize) -> bool {
135 debug_assert!(slot < 2);
136 match slot {
137 0 => self.is_set(ControlBit::IsSlot0Initialised),
138 1 => true,
139 _ => false,
140 }
141 }
142 fn set_slot_initialised(self, slot: usize) -> Self {
143 debug_assert!(slot < 2);
144 match slot {
145 0 => self.set(ControlBit::IsSlot0Initialised),
146 _ => self,
147 }
148 }
149}
150
151impl From<ControlBits> for u8 {
152 fn from(bits: ControlBits) -> u8 {
153 bits.0
154 }
155}
156
157impl From<u8> for ControlBits {
158 fn from(byte: u8) -> ControlBits {
159 ControlBits(byte)
160 }
161}
162
163impl<T> LockingReader<T> {
164 fn update(&self) -> &T {
165 let mut control: ControlBits = self.shared.control.load(Ordering::SeqCst).into();
166
167 let read_slot_index = control
168 .is_set(ControlBit::HasNewData)
169 .then(|| {
170 let backoff = Backoff::default();
171 loop {
172 let current = control.clear(ControlBit::IsWriterBusy);
174
175 let new = current
178 .clear(ControlBit::HasNewData)
179 .flip(ControlBit::WriteSlotIndex);
180
181 match self
182 .shared
183 .control
184 .compare_exchange_weak(
185 current.into(),
186 new.into(),
187 Ordering::AcqRel,
188 Ordering::Relaxed,
189 )
190 .map(ControlBits)
191 {
192 Ok(previous) => {
193 return previous.write_slot_index();
194 }
195 Err(actual) => {
196 control = actual.into();
197 backoff.spin();
198 }
199 }
200 }
201 })
202 .unwrap_or(control.read_slot_index());
203
204 let read_slot = unsafe { self.shared.get(read_slot_index) };
206
207 unsafe { read_slot.assume_init_ref() }
211 }
212
213 pub fn get(&self) -> T
215 where
216 T: Clone,
217 {
218 self.update().clone()
219 }
220
221 pub fn get_ref(&mut self) -> &T {
226 self.update()
227 }
228}
229
230impl<T> RealtimeWriter<T> {
231 pub fn set<V>(&self, value: V)
234 where
235 V: Into<T>,
236 T: Send,
237 {
238 let control: ControlBits = self
241 .shared
242 .control
243 .fetch_or(ControlBit::IsWriterBusy.into(), Ordering::Acquire)
244 .into();
245
246 let write_slot_index = control.write_slot_index();
247
248 {
249 let write_slot = unsafe { self.shared.get_mut(write_slot_index) };
253
254 if control.is_slot_initialised(write_slot_index) {
255 unsafe {
258 write_slot.assume_init_drop();
259 }
260 }
261
262 write_slot.write(value.into());
263 }
264
265 let control = control
266 .set_slot_initialised(write_slot_index)
267 .set(ControlBit::HasNewData);
268
269 self.shared.control.store(control.into(), Ordering::Release);
271 }
272}
273
274#[cfg(test)]
275mod test {
276 use {
277 super::*,
278 static_assertions::{assert_impl_all, assert_not_impl_any},
279 std::{sync::atomic::AtomicUsize, thread},
280 };
281
282 assert_impl_all!(RealtimeWriter<i32>: Send);
283 assert_not_impl_any!(RealtimeWriter<i32>: Sync, Copy, Clone);
284
285 assert_impl_all!(LockingReader<i32>: Send);
286 assert_not_impl_any!(LockingReader<i32>: Sync, Copy, Clone);
287
288 #[test]
289 fn determining_the_read_and_write_indexes() {
290 let control = ControlBits(0b000);
291
292 assert_eq!(control.write_slot_index(), 0);
293 assert_eq!(control.read_slot_index(), 1);
294
295 let control = control.flip(ControlBit::WriteSlotIndex);
296
297 assert_eq!(control.write_slot_index(), 1);
298 assert_eq!(control.read_slot_index(), 0);
299 }
300
301 #[test]
302 fn setting_bits_in_the_control() {
303 let control = ControlBits(0b000);
304
305 assert!(!control.is_set(ControlBit::WriteSlotIndex));
306 assert!(!control.is_set(ControlBit::IsWriterBusy));
307 assert!(!control.is_set(ControlBit::HasNewData));
308
309 let control = control.set(ControlBit::IsWriterBusy);
310
311 assert!(!control.is_set(ControlBit::WriteSlotIndex));
312 assert!(control.is_set(ControlBit::IsWriterBusy));
313 assert!(!control.is_set(ControlBit::HasNewData));
314
315 let control = control
316 .clear(ControlBit::IsWriterBusy)
317 .set(ControlBit::HasNewData);
318
319 assert!(!control.is_set(ControlBit::WriteSlotIndex));
320 assert!(!control.is_set(ControlBit::IsWriterBusy));
321 assert!(control.is_set(ControlBit::HasNewData));
322 }
323
324 #[test]
325 fn managing_the_control_bits() {
326 let (writer, reader) = writable(0);
327 let get_controls_bits =
328 |writer: &RealtimeWriter<_>| ControlBits(writer.shared.control.load(Ordering::SeqCst));
329
330 {
331 let initial_control_bits = get_controls_bits(&writer);
332 assert!(!initial_control_bits.is_set(ControlBit::IsWriterBusy));
333 assert!(!initial_control_bits.is_set(ControlBit::HasNewData));
334 assert!(!initial_control_bits.is_set(ControlBit::IsSlot0Initialised));
335 assert_eq!(initial_control_bits.write_slot_index(), 0);
336 assert_eq!(initial_control_bits.read_slot_index(), 1);
337 }
338
339 writer.set(1);
340
341 {
342 let control_bits_after_set = get_controls_bits(&writer);
343 assert!(!control_bits_after_set.is_set(ControlBit::IsWriterBusy));
344 assert!(control_bits_after_set.is_set(ControlBit::HasNewData));
345 assert!(control_bits_after_set.is_set(ControlBit::IsSlot0Initialised));
346 assert_eq!(control_bits_after_set.write_slot_index(), 0);
347 assert_eq!(control_bits_after_set.read_slot_index(), 1);
348 }
349
350 let _value = reader.get();
351
352 {
353 let control_bits_after_get = get_controls_bits(&writer);
354 assert!(!control_bits_after_get.is_set(ControlBit::IsWriterBusy));
355 assert!(!control_bits_after_get.is_set(ControlBit::HasNewData));
356 assert!(control_bits_after_get.is_set(ControlBit::IsSlot0Initialised));
357 assert_eq!(control_bits_after_get.write_slot_index(), 1);
358 assert_eq!(control_bits_after_get.read_slot_index(), 0);
359 }
360
361 writer.set(2);
362
363 {
364 let control_bits_after_set = get_controls_bits(&writer);
365 assert!(!control_bits_after_set.is_set(ControlBit::IsWriterBusy));
366 assert!(control_bits_after_set.is_set(ControlBit::HasNewData));
367 assert!(control_bits_after_set.is_set(ControlBit::IsSlot0Initialised));
368 assert_eq!(control_bits_after_set.write_slot_index(), 1);
369 assert_eq!(control_bits_after_set.read_slot_index(), 0);
370 }
371 }
372
373 #[test]
374 fn multiple_reads_before_new_writes_dont_read_old_data() {
375 let (writer, reader) = writable(0);
376
377 assert_eq!(reader.get(), 0);
378
379 writer.set(1);
380
381 assert_eq!(reader.get(), 1);
382 assert_eq!(reader.get(), 1);
383
384 writer.set(2);
385
386 assert_eq!(reader.get(), 2);
387 assert_eq!(reader.get(), 2);
388 }
389
390 #[test]
391 fn reading_and_writing_simultaneously() {
392 let (writer, mut reader) = writable(0);
393
394 let reader = thread::spawn(move || {
395 let last_value = reader.get();
396 loop {
397 let value = reader.get_ref();
398
399 assert!(*value <= 1000);
400 assert!(*value >= 0);
401 assert!(*value >= last_value);
402
403 if *value == 1000 {
404 break;
405 }
406 }
407 });
408
409 for i in 0..=1000 {
410 writer.set(i);
411 }
412
413 reader.join().unwrap();
414 }
415
416 #[test]
417 fn all_values_get_dropped() {
418 struct DroppableValue(Arc<AtomicUsize>);
419 impl DroppableValue {
420 fn new(drop_count: &Arc<AtomicUsize>) -> Self {
421 DroppableValue(Arc::clone(&drop_count))
422 }
423 }
424 impl Drop for DroppableValue {
425 fn drop(&mut self) {
426 self.0.fetch_add(1, Ordering::SeqCst);
427 }
428 }
429
430 for number_of_sets in 0..3 {
431 {
432 let drop_count = Arc::new(AtomicUsize::new(0));
433
434 {
435 let (writer, _) = writable(DroppableValue::new(&drop_count));
436
437 for _ in 0..number_of_sets {
438 writer.set(DroppableValue::new(&drop_count));
439 }
440 }
441
442 let expected_drop_count = number_of_sets + 1;
443 assert_eq!(drop_count.load(Ordering::SeqCst), expected_drop_count);
444 }
445 }
446 }
447}