real_time/
writer.rs

1use {
2    crate::{
3        backoff::Backoff,
4        sync::{
5            atomic::{AtomicU8, Ordering},
6            Arc,
7        },
8        PhantomUnsync,
9    },
10    std::{cell::UnsafeCell, marker::PhantomData, mem::MaybeUninit},
11};
12
13/// A shared value that can read on a non-real-time thread.
14pub struct LockingReader<T> {
15    shared: Arc<Shared<T>>,
16    _marker: PhantomUnsync,
17}
18
19/// A shared value that can be mutated on the real-time thread without blocking.
20pub struct RealtimeWriter<T> {
21    shared: Arc<Shared<T>>,
22    _marker: PhantomUnsync,
23}
24
25/// Creates a shared value that can be mutated on the real-time thread without
26/// blocking.
27pub fn writable<T>(value: T) -> (RealtimeWriter<T>, LockingReader<T>)
28where
29    T: Send,
30{
31    let shared = Arc::new(Shared {
32        values: [
33            UnsafeCell::new(MaybeUninit::uninit()),
34            UnsafeCell::new(MaybeUninit::new(value)),
35        ],
36        control: AtomicU8::new(ControlBits::default().into()),
37    });
38
39    (
40        RealtimeWriter {
41            shared: Arc::clone(&shared),
42            _marker: PhantomData,
43        },
44        LockingReader {
45            shared,
46            _marker: PhantomData,
47        },
48    )
49}
50
51struct Shared<T> {
52    values: [UnsafeCell<MaybeUninit<T>>; 2],
53    control: AtomicU8,
54}
55
56impl<T> Shared<T> {
57    /// The caller must ensure that no mutations or mutable aliases can occur
58    /// for the value at the slot being accessed.
59    #[inline]
60    unsafe fn get(&self, index: usize) -> &MaybeUninit<T> {
61        debug_assert!(index < self.values.len());
62        &*self.values[index].get()
63    }
64
65    /// The caller must have exclusive access to the value in the slot being
66    /// accessed.
67    #[inline]
68    unsafe fn get_mut(&self, index: usize) -> &mut MaybeUninit<T> {
69        debug_assert!(index < self.values.len());
70        &mut *self.values[index].get()
71    }
72}
73
74impl<T> Drop for Shared<T> {
75    fn drop(&mut self) {
76        let control: ControlBits = self.control.load(Ordering::SeqCst).into();
77
78        for slot in 0..self.values.len() {
79            if control.is_slot_initialised(slot) {
80                // SAFETY: We have unique access to both slots, and the slot contains a
81                // previously initialised value.
82                unsafe {
83                    self.get_mut(slot).assume_init_drop();
84                };
85            }
86        }
87    }
88}
89
90unsafe impl<T> Send for Shared<T> {}
91unsafe impl<T> Sync for Shared<T> {}
92
93#[repr(u8)]
94enum ControlBit {
95    WriteSlotIndex = 0b001,
96    IsWriterBusy = 0b010,
97    HasNewData = 0b100,
98    IsSlot0Initialised = 0b1000,
99}
100
101impl From<ControlBit> for u8 {
102    fn from(bit: ControlBit) -> u8 {
103        bit as u8
104    }
105}
106
107#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
108struct ControlBits(u8);
109
110impl ControlBits {
111    fn is_set(&self, bit: ControlBit) -> bool {
112        self.0 & (bit as u8) != 0
113    }
114    fn set(self, bit: ControlBit) -> Self {
115        (self.0 | (bit as u8)).into()
116    }
117    fn clear(self, bit: ControlBit) -> Self {
118        (self.0 & !(bit as u8)).into()
119    }
120    fn flip(self, bit: ControlBit) -> Self {
121        (self.0 ^ (bit as u8)).into()
122    }
123    fn bitwise_and(self, bit: ControlBit) -> Self {
124        (self.0 & (bit as u8)).into()
125    }
126    fn write_slot_index(self) -> usize {
127        self.bitwise_and(ControlBit::WriteSlotIndex).0 as usize
128    }
129    fn read_slot_index(self) -> usize {
130        self.bitwise_and(ControlBit::WriteSlotIndex)
131            .flip(ControlBit::WriteSlotIndex)
132            .0 as usize
133    }
134    fn is_slot_initialised(self, slot: usize) -> bool {
135        debug_assert!(slot < 2);
136        match slot {
137            0 => self.is_set(ControlBit::IsSlot0Initialised),
138            1 => true,
139            _ => false,
140        }
141    }
142    fn set_slot_initialised(self, slot: usize) -> Self {
143        debug_assert!(slot < 2);
144        match slot {
145            0 => self.set(ControlBit::IsSlot0Initialised),
146            _ => self,
147        }
148    }
149}
150
151impl From<ControlBits> for u8 {
152    fn from(bits: ControlBits) -> u8 {
153        bits.0
154    }
155}
156
157impl From<u8> for ControlBits {
158    fn from(byte: u8) -> ControlBits {
159        ControlBits(byte)
160    }
161}
162
163impl<T> LockingReader<T> {
164    fn update(&self) -> &T {
165        let mut control: ControlBits = self.shared.control.load(Ordering::SeqCst).into();
166
167        let read_slot_index = control
168            .is_set(ControlBit::HasNewData)
169            .then(|| {
170                let backoff = Backoff::default();
171                loop {
172                    // Wait until the writer has finished writing...
173                    let current = control.clear(ControlBit::IsWriterBusy);
174
175                    // ...and then swap the read and write slots, also clearing the new data
176                    // bit.
177                    let new = current
178                        .clear(ControlBit::HasNewData)
179                        .flip(ControlBit::WriteSlotIndex);
180
181                    match self
182                        .shared
183                        .control
184                        .compare_exchange_weak(
185                            current.into(),
186                            new.into(),
187                            Ordering::AcqRel,
188                            Ordering::Relaxed,
189                        )
190                        .map(ControlBits)
191                    {
192                        Ok(previous) => {
193                            return previous.write_slot_index();
194                        }
195                        Err(actual) => {
196                            control = actual.into();
197                            backoff.spin();
198                        }
199                    }
200                }
201            })
202            .unwrap_or(control.read_slot_index());
203
204        // SAFETY: There are no mutable aliases to the read slot.
205        let read_slot = unsafe { self.shared.get(read_slot_index) };
206
207        // SAFETY: Value is in an initialised state as it is either the value that was
208        // initialised in the constructor, or an initialised value written by the writer
209        // before it notified us of new data.
210        unsafe { read_slot.assume_init_ref() }
211    }
212
213    /// Get the latest value.
214    pub fn get(&self) -> T
215    where
216        T: Clone,
217    {
218        self.update().clone()
219    }
220
221    /// Get a reference to the latest value.
222    ///
223    /// This method requires a unique borrow of the reader to ensure that the
224    /// reference isn't invalidated.
225    pub fn get_ref(&mut self) -> &T {
226        self.update()
227    }
228}
229
230impl<T> RealtimeWriter<T> {
231    /// Set the shared value and make the update immediately available to any
232    /// non-real-time threads.
233    pub fn set<V>(&self, value: V)
234    where
235        V: Into<T>,
236        T: Send,
237    {
238        // Set the busy bit to prevent the reader from swapping the slots whilst we are
239        // writing.
240        let control: ControlBits = self
241            .shared
242            .control
243            .fetch_or(ControlBit::IsWriterBusy.into(), Ordering::Acquire)
244            .into();
245
246        let write_slot_index = control.write_slot_index();
247
248        {
249            // SAFETY: We have unique access to the value in the write slot, as we have
250            // signalled to the reader that we are currently busy, preventing it from
251            // swapping the slots.
252            let write_slot = unsafe { self.shared.get_mut(write_slot_index) };
253
254            if control.is_slot_initialised(write_slot_index) {
255                // SAFETY: The value is in an initialised state as each slot has been written to
256                // as indicated by the control bit.
257                unsafe {
258                    write_slot.assume_init_drop();
259                }
260            }
261
262            write_slot.write(value.into());
263        }
264
265        let control = control
266            .set_slot_initialised(write_slot_index)
267            .set(ControlBit::HasNewData);
268
269        // Tell the reader that new data is available and clear the busy bit.
270        self.shared.control.store(control.into(), Ordering::Release);
271    }
272}
273
274#[cfg(test)]
275mod test {
276    use {
277        super::*,
278        static_assertions::{assert_impl_all, assert_not_impl_any},
279        std::{sync::atomic::AtomicUsize, thread},
280    };
281
282    assert_impl_all!(RealtimeWriter<i32>: Send);
283    assert_not_impl_any!(RealtimeWriter<i32>: Sync, Copy, Clone);
284
285    assert_impl_all!(LockingReader<i32>: Send);
286    assert_not_impl_any!(LockingReader<i32>: Sync, Copy, Clone);
287
288    #[test]
289    fn determining_the_read_and_write_indexes() {
290        let control = ControlBits(0b000);
291
292        assert_eq!(control.write_slot_index(), 0);
293        assert_eq!(control.read_slot_index(), 1);
294
295        let control = control.flip(ControlBit::WriteSlotIndex);
296
297        assert_eq!(control.write_slot_index(), 1);
298        assert_eq!(control.read_slot_index(), 0);
299    }
300
301    #[test]
302    fn setting_bits_in_the_control() {
303        let control = ControlBits(0b000);
304
305        assert!(!control.is_set(ControlBit::WriteSlotIndex));
306        assert!(!control.is_set(ControlBit::IsWriterBusy));
307        assert!(!control.is_set(ControlBit::HasNewData));
308
309        let control = control.set(ControlBit::IsWriterBusy);
310
311        assert!(!control.is_set(ControlBit::WriteSlotIndex));
312        assert!(control.is_set(ControlBit::IsWriterBusy));
313        assert!(!control.is_set(ControlBit::HasNewData));
314
315        let control = control
316            .clear(ControlBit::IsWriterBusy)
317            .set(ControlBit::HasNewData);
318
319        assert!(!control.is_set(ControlBit::WriteSlotIndex));
320        assert!(!control.is_set(ControlBit::IsWriterBusy));
321        assert!(control.is_set(ControlBit::HasNewData));
322    }
323
324    #[test]
325    fn managing_the_control_bits() {
326        let (writer, reader) = writable(0);
327        let get_controls_bits =
328            |writer: &RealtimeWriter<_>| ControlBits(writer.shared.control.load(Ordering::SeqCst));
329
330        {
331            let initial_control_bits = get_controls_bits(&writer);
332            assert!(!initial_control_bits.is_set(ControlBit::IsWriterBusy));
333            assert!(!initial_control_bits.is_set(ControlBit::HasNewData));
334            assert!(!initial_control_bits.is_set(ControlBit::IsSlot0Initialised));
335            assert_eq!(initial_control_bits.write_slot_index(), 0);
336            assert_eq!(initial_control_bits.read_slot_index(), 1);
337        }
338
339        writer.set(1);
340
341        {
342            let control_bits_after_set = get_controls_bits(&writer);
343            assert!(!control_bits_after_set.is_set(ControlBit::IsWriterBusy));
344            assert!(control_bits_after_set.is_set(ControlBit::HasNewData));
345            assert!(control_bits_after_set.is_set(ControlBit::IsSlot0Initialised));
346            assert_eq!(control_bits_after_set.write_slot_index(), 0);
347            assert_eq!(control_bits_after_set.read_slot_index(), 1);
348        }
349
350        let _value = reader.get();
351
352        {
353            let control_bits_after_get = get_controls_bits(&writer);
354            assert!(!control_bits_after_get.is_set(ControlBit::IsWriterBusy));
355            assert!(!control_bits_after_get.is_set(ControlBit::HasNewData));
356            assert!(control_bits_after_get.is_set(ControlBit::IsSlot0Initialised));
357            assert_eq!(control_bits_after_get.write_slot_index(), 1);
358            assert_eq!(control_bits_after_get.read_slot_index(), 0);
359        }
360
361        writer.set(2);
362
363        {
364            let control_bits_after_set = get_controls_bits(&writer);
365            assert!(!control_bits_after_set.is_set(ControlBit::IsWriterBusy));
366            assert!(control_bits_after_set.is_set(ControlBit::HasNewData));
367            assert!(control_bits_after_set.is_set(ControlBit::IsSlot0Initialised));
368            assert_eq!(control_bits_after_set.write_slot_index(), 1);
369            assert_eq!(control_bits_after_set.read_slot_index(), 0);
370        }
371    }
372
373    #[test]
374    fn multiple_reads_before_new_writes_dont_read_old_data() {
375        let (writer, reader) = writable(0);
376
377        assert_eq!(reader.get(), 0);
378
379        writer.set(1);
380
381        assert_eq!(reader.get(), 1);
382        assert_eq!(reader.get(), 1);
383
384        writer.set(2);
385
386        assert_eq!(reader.get(), 2);
387        assert_eq!(reader.get(), 2);
388    }
389
390    #[test]
391    fn reading_and_writing_simultaneously() {
392        let (writer, mut reader) = writable(0);
393
394        let reader = thread::spawn(move || {
395            let last_value = reader.get();
396            loop {
397                let value = reader.get_ref();
398
399                assert!(*value <= 1000);
400                assert!(*value >= 0);
401                assert!(*value >= last_value);
402
403                if *value == 1000 {
404                    break;
405                }
406            }
407        });
408
409        for i in 0..=1000 {
410            writer.set(i);
411        }
412
413        reader.join().unwrap();
414    }
415
416    #[test]
417    fn all_values_get_dropped() {
418        struct DroppableValue(Arc<AtomicUsize>);
419        impl DroppableValue {
420            fn new(drop_count: &Arc<AtomicUsize>) -> Self {
421                DroppableValue(Arc::clone(&drop_count))
422            }
423        }
424        impl Drop for DroppableValue {
425            fn drop(&mut self) {
426                self.0.fetch_add(1, Ordering::SeqCst);
427            }
428        }
429
430        for number_of_sets in 0..3 {
431            {
432                let drop_count = Arc::new(AtomicUsize::new(0));
433
434                {
435                    let (writer, _) = writable(DroppableValue::new(&drop_count));
436
437                    for _ in 0..number_of_sets {
438                        writer.set(DroppableValue::new(&drop_count));
439                    }
440                }
441
442                let expected_drop_count = number_of_sets + 1;
443                assert_eq!(drop_count.load(Ordering::SeqCst), expected_drop_count);
444            }
445        }
446    }
447}