atomic_pingpong/
lib.rs

1//! Lightweight ping-pong buffer intended for no_std targets.
2//!
3//! A ping-pong buffer is a two-element buffer which allows simultaneous access
4//! by a single producer and a single consumer.  One element is reserved for
5//! writing by the producer, and the other element is reserved for reading by
6//! the consumer. When writing and reading are finished, the roles of the two
7//! elements are swapped (i.e. the one which was written will be next to be
8//! read, and the one which was read will be next to be overwritten). This
9//! approach avoids the need for memory copies, which improves performance when
10//! the element size is large.
11//!
12//! The ping-pong buffer is specifically designed to allow simultaneous reading
13//! and writing.  However, the roles of the two elements can only be safely
14//! swapped when neither reading or writing is in progress.  It is the user's
15//! responsibility to ensure that the timing of reads and writes allows for this
16//! to happen.  If reads and writes are interleaved such that one or the other
17//! is always in progress, then the roles of the buffer elements will never be
18//! able to swap, and the reader will continue to read an old value rather than
19//! the new values which are being written.
20//!
21//! A reference for reading is acquired by calling `Buffer<T>::read()`, and a
22//! mutable reference for writing is acquired by calling `Buffer<T>::write()`.
23//! The types returned are smart pointers (`Ref<T>` and `RefMut<T>`,
24//! respectively), which automatically update the state of the ping-pong buffer
25//! when they are dropped. Attempting to acquire a second reference for reading
26//! or writing will fail if the first reference of that type has not been dropped.
27//! To opt out of automatic reference management, a set of unsafe access functions
28//! are available: `read_unchecked()`, `write_unchecked()`, `release_read()`, and
29//! `release_write()`.  These functions provide reduced runtime overhead but, of
30//! course, care is required to use them safely.
31//!
32//! Ordinarily, calls to `read()` and `write()` are as permissive as possible:
33//! `read()` succeeds unless reading is already in progress, and `write()`
34//! succeeds unless writing is already in progress. Thus, depending on the
35//! timing of `read()` and `write()` calls, certain data which is written may
36//! never be read, and other data which is written may be read multiple times.
37//! (This is an important distinction between a ping-pong buffer and a FIFO
38//! ring buffer.) Alternative behavior is possible using the `read_once()`
39//! function, which only returns a `Ref<T>` if it points to data which has not
40//! yet been read, and the `write_no_discard()` function, which only returns a
41//! `RefMut<T>` if the buffer does not currently contain unread data.
42//!
43//! The memory footprint of a `Buffer<T>` is two of `T` plus one additional byte
44//! (an `AtomicU8`) which is used to synchronize access by the producer and
45//! consumer. The runtime overhead from this implementation is less than about
46//! twenty instructions to acquire or release a reference to the ping-pong
47//! buffer (assuming function inlining is enabled).  However, this crate can
48//! only be used on targets which include atomic compare/swap in their
49//! instruction sets.
50
51#![no_std]
52
53use core::cell::UnsafeCell;
54use core::mem::MaybeUninit;
55use core::sync::atomic;
56
57// Basically all of the tricky synchronization logic for the ping-pong buffer
58// lives in the BufferState implementation.  The buffer state is a bitmask
59// stored in an AtomicU8, rather than booleans or enums, in order to permit
60// atomic updtes to multiple flags at once.  The custom BufferState type
61// provides a convenient place for the associated functions and constants.
62struct BufferState(atomic::AtomicU8);
63
64/// A `Buffer<T>` consists of two copies of `T` plus one additional byte of
65/// state.
66pub struct Buffer<T> {
67    ping: UnsafeCell<T>,
68    pong: UnsafeCell<T>,
69    state: BufferState,
70}
71
72/// Smart pointer for reading from a `Buffer<T>`.
73/// Updates the buffer's state when dropped.
74pub struct Ref<'a, T> {
75    ptr: &'a T,
76    state: &'a BufferState,
77}
78
79/// Smart pointer for writing to a `Buffer<T>`.
80/// Updates the buffer's state when dropped.
81pub struct RefMut<'a, T> {
82    ptr: &'a mut T,
83    state: &'a BufferState,
84}
85
86impl BufferState {
87    // Bits of the bitmask:
88    const LOCK_READ: u8 = 0b0000_0001;
89    const LOCK_WRITE: u8 = 0b0000_0010;
90    const MODE_IS_FLIPPED: u8 = 0b0000_0100;
91    const WANT_MODE_CHANGE: u8 = 0b0000_1000;
92    const NEW_DATA_READY: u8 = 0b0001_0000;
93
94    const fn new() -> Self {
95        Self(atomic::AtomicU8::new(0))
96    }
97    /// If `condition()` is true, atomically update the state byte with
98    /// `action()` (using "Acquire" ordering) and return the current mode.
99    /// If `condition()` is false, return None without changing the state byte.
100    fn lock(&self, condition: fn(u8) -> bool, action: fn(u8) -> u8) -> Option<bool> {
101        let mut new_flags = None::<u8>;
102        let _ = self.0.fetch_update(
103            atomic::Ordering::Acquire,
104            atomic::Ordering::Relaxed,
105            |flags| {
106                if condition(flags) {
107                    new_flags = Some(action(flags));
108                }
109                new_flags
110            },
111        );
112        new_flags.map(|f| f & Self::MODE_IS_FLIPPED != 0)
113    }
114    fn lock_read(&self, allow_repeated: bool) -> Option<bool> {
115        self.lock(
116            if allow_repeated {
117                // allow reading the same data multiple times
118                |flags| flags & Self::LOCK_READ == 0
119            } else {
120                // only lock for reading if there is new unread data
121                |flags| flags & (Self::LOCK_READ | Self::NEW_DATA_READY) == Self::NEW_DATA_READY
122            },
123            |flags| (flags | Self::LOCK_READ) & !Self::NEW_DATA_READY,
124        )
125    }
126    fn lock_write(&self, allow_repeated: bool) -> Option<bool> {
127        self.lock(
128            if allow_repeated {
129                // allow overwriting data which has not yet been read
130                |flags| flags & Self::LOCK_WRITE == 0
131            } else {
132                // only lock for writing if there is not any unread data
133                |flags| flags & (Self::LOCK_WRITE | Self::NEW_DATA_READY) == 0
134            },
135            |flags| flags | Self::LOCK_WRITE,
136        )
137    }
138    /// Atomically update the state byte with `action()`
139    /// (using "Release" ordering).
140    fn release(&self, action: fn(u8) -> u8) {
141        let _ = self.0.fetch_update(
142            atomic::Ordering::Release,
143            atomic::Ordering::Relaxed,
144            |flags| Some(action(flags)),
145        ); // always Ok because the closure always returns Some
146    }
147    fn release_read(&self) {
148        self.release(|mut flags| {
149            flags &= !Self::LOCK_READ;
150            if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
151                flags &= !Self::WANT_MODE_CHANGE;
152                flags ^= Self::MODE_IS_FLIPPED;
153            }
154            flags
155        })
156    }
157    fn release_write(&self) {
158        self.release(|mut flags| {
159            flags &= !Self::LOCK_WRITE;
160            flags |= Self::NEW_DATA_READY;
161            if flags & Self::LOCK_READ == 0 {
162                flags &= !Self::WANT_MODE_CHANGE;
163                flags ^= Self::MODE_IS_FLIPPED;
164            } else {
165                flags |= Self::WANT_MODE_CHANGE;
166            }
167            flags
168        })
169    }
170    /// Atomically update the state byte with `action()`
171    /// (using "AcqRel" ordering) and return the current mode.
172    fn release_and_lock(&self, action: fn(u8) -> u8) -> bool {
173        let mut new_flags = 0u8;
174        let _ = self.0.fetch_update(
175            atomic::Ordering::AcqRel,
176            atomic::Ordering::Relaxed,
177            |flags| {
178                new_flags = action(flags);
179                Some(new_flags)
180            },
181        ); // always Ok because the closure always returns Some
182        new_flags & Self::MODE_IS_FLIPPED != 0
183    }
184    fn release_and_lock_read(&self) -> bool {
185        self.release_and_lock(|mut flags| {
186            flags |= Self::LOCK_READ;
187            flags &= !Self::NEW_DATA_READY;
188            if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
189                flags &= !Self::WANT_MODE_CHANGE;
190                flags ^= Self::MODE_IS_FLIPPED;
191            }
192            flags
193        })
194    }
195    fn release_and_lock_write(&self) -> bool {
196        self.release_and_lock(|mut flags| {
197            if flags & Self::LOCK_WRITE != 0 {
198                flags |= Self::NEW_DATA_READY;
199                if flags & Self::LOCK_READ == 0 {
200                    flags &= !Self::WANT_MODE_CHANGE;
201                    flags ^= Self::MODE_IS_FLIPPED;
202                } else {
203                    flags |= Self::WANT_MODE_CHANGE;
204                }
205            } else {
206                flags |= Self::LOCK_WRITE;
207            }
208            flags
209        })
210    }
211}
212
213impl<'a, T> Ref<'a, T> {
214    fn new(buf: &'a Buffer<T>, allow_repeated: bool) -> Option<Self> {
215        let mode = buf.state.lock_read(allow_repeated)?;
216        // If we get here, lock_read() succeeded, so it's safe to access the UnsafeCell
217        // which is currently designated for reading.
218        Some(Ref {
219            ptr: unsafe { &*buf.get_pointer(mode, true) },
220            state: &buf.state,
221        })
222    }
223}
224
225impl<'a, T> RefMut<'a, T> {
226    fn new(buf: &'a Buffer<T>, allow_repeated: bool) -> Option<Self> {
227        let mode = buf.state.lock_write(allow_repeated)?;
228        // If we get here, lock_write() succeeded, so it's safe to access the UnsafeCell
229        // which is currently designated for writing.
230        Some(RefMut {
231            ptr: unsafe { &mut *buf.get_pointer(mode, false) },
232            state: &buf.state,
233        })
234    }
235}
236
237impl<'a, T> Drop for Ref<'a, T> {
238    /// When a `Ref<'a, T>` is dropped, the state of the corresponding
239    /// `Buffer<T>` is automatically updated.
240    fn drop(&mut self) {
241        self.state.release_read();
242    }
243}
244
245impl<'a, T> Drop for RefMut<'a, T> {
246    /// When a `RefMut<'a, T>` is dropped, the state of the corresponding
247    /// `Buffer<T>` is automatically updated.
248    fn drop(&mut self) {
249        self.state.release_write();
250    }
251}
252
253impl<'a, T> core::ops::Deref for Ref<'a, T> {
254    /// `Ref<'a, T>` dereferences to a `T` element of the `Buffer<T>`.
255    type Target = T;
256    fn deref(&self) -> &T {
257        self.ptr
258    }
259}
260
261impl<'a, T> core::ops::Deref for RefMut<'a, T> {
262    /// `RefMut<'a, T>` dereferences to a `T` element of the `Buffer<T>`.
263    type Target = T;
264    /// Dereferences the value.
265    /// (Required in order to support `deref_mut`;
266    /// not likely to be useful on its own.)
267    fn deref(&self) -> &T {
268        self.ptr
269    }
270}
271
272impl<'a, T> core::ops::DerefMut for RefMut<'a, T> {
273    fn deref_mut(&mut self) -> &mut T {
274        self.ptr
275    }
276}
277
278impl<T: Copy> Buffer<T> {
279    /// Returns a new ping-pong buffer with the elements initialized to the
280    /// specified value.
281    pub const fn new(value: T) -> Self {
282        Buffer {
283            ping: UnsafeCell::new(value),
284            pong: UnsafeCell::new(value),
285            state: BufferState::new(),
286        }
287    }
288}
289
290impl<T: Default> Buffer<T> {
291    /// Returns a new ping-pong buffer with the elements initialized to their
292    /// default value.
293    pub fn default() -> Self {
294        Buffer {
295            ping: UnsafeCell::default(),
296            pong: UnsafeCell::default(),
297            state: BufferState::new(),
298        }
299    }
300}
301
302impl<T> Buffer<MaybeUninit<T>> {
303    /// Returns a new ping-pong buffer with uninitialized elements.
304    pub const fn uninit() -> Self {
305        Buffer {
306            ping: UnsafeCell::new(MaybeUninit::uninit()),
307            pong: UnsafeCell::new(MaybeUninit::uninit()),
308            state: BufferState::new(),
309        }
310    }
311}
312
313impl<T> Buffer<T> {
314    const fn get_pointer(&self, state: bool, read: bool) -> *mut T {
315        // state = false => read ping and write pong
316        // state = true  => read pong and write ping
317        (if state ^ read { &self.ping } else { &self.pong }).get()
318    }
319    /// Returns a `Ref<T>` smart pointer providing read-only access to the
320    /// ping-pong buffer, or `None` if the `Ref<T>` from a previous call has
321    /// not been dropped yet. If a call to `write` previously finished and
322    /// the ping-pong buffer was able to swap, the `T` element pointed to by
323    /// the reference will be a value that was previously written.
324    /// Otherwise, the `T` element will have its specified initial value based
325    /// on the function which was used to construct the ping-pong buffer.
326    pub fn read(&self) -> Option<Ref<T>> {
327        Ref::new(&self, true)
328    }
329    /// Ordinarily, the `read()` function allows the same data to be read
330    /// multiple times, and it allows the initial value to be read prior to
331    /// any calls to `write()`. In contrast, `read_once()` only returns a
332    /// `Ref<T>` if it points to new data which has been written into the
333    /// buffer and not yet read. Returns `None` if new data is not available
334    /// to read or if a previous `Ref<T>` has not yet been dropped.
335    pub fn read_once(&self) -> Option<Ref<T>> {
336        Ref::new(&self, false)
337    }
338    /// Returns a `RefMut<T>` smart pointer providing mutable access to the
339    /// ping-pong buffer, or `None` if the `RefMut<T>` from a previous call
340    /// has not been dropped yet. Due to the nature of the ping-pong buffer,
341    /// the `T` element pointed to by the reference may have an arbitrary
342    /// starting value prior to being overwritten by the caller.
343    pub fn write(&self) -> Option<RefMut<T>> {
344        RefMut::new(&self, true)
345    }
346    /// Ordinarily, the `write()` function allows an arbitrary number of
347    /// sequential writes, even if data which was previously written (and
348    /// will now be overwritten) has never been read.  In contrast,
349    /// `write_no_discard()` only returns a `RefMut<T>` if no unread
350    /// data will be overwritten by this write. Returns `None` if the buffer
351    /// already contains unread data or if a previous `RefMut<T>` has not
352    /// yet been dropped.
353    pub fn write_no_discard(&self) -> Option<RefMut<T>> {
354        RefMut::new(&self, false)
355    }
356    /// When the ping-pong buffer is used safely, reading is
357    /// automatically marked as complete when the `Ref<T>` is dropped.
358    /// This mechanism may be circumvented by forgetting a `Ref<T>` (so
359    /// that its destructor doesn't run), or by acquiring a raw pointer
360    /// from `read_unchecked()`.  In these cases, `release_read()` should be
361    /// called when there will be no more access to the data being read.
362    /// UNSAFE: any existing `Ref<T>` for this buffer, and any reference
363    /// previously returned by `read_unchecked()`, must be forgotten or dropped
364    /// before calling this function.
365    pub unsafe fn release_read(&self) {
366        self.state.release_read();
367    }
368    /// When the ping-pong buffer is used safely, writing is
369    /// automatically marked as complete when the `RefMut<T>` is dropped.
370    /// This mechanism may be circumvented by forgetting a `RefMut<T>` (so
371    /// that its destructor doesn't run), or by acquiring a raw pointer
372    /// from `write_unchecked()`.  In these cases, `release_write()` should be
373    /// called when there will be no more access to the data being written.
374    /// UNSAFE: any existing `RefMut<T>` for this buffer, and any reference
375    /// previously returned by `write_unchecked()`, must be forgotten or dropped
376    /// before calling this function.
377    pub unsafe fn release_write(&self) {
378        self.state.release_write();
379    }
380    /// `Buffer<T>::read_unchecked()` is logically equivalent to
381    /// `Buffer<T>::release_read()` followed by `&*Buffer<T>::read().unwrap()`.
382    /// Using `read_unchecked()` results in reduced execution time, because
383    /// only one atomic operation is needed (rather than two), and success is
384    /// guaranteed (so there is no need to deal with an `Option<Ref<T>>`).
385    /// UNSAFE: any existing `Ref<T>` for this buffer, and any reference
386    /// previously returned by `read_unchecked()`, must be forgotten or dropped
387    /// before calling this function.
388    pub unsafe fn read_unchecked(&self) -> &T {
389        &*self.get_pointer(self.state.release_and_lock_read(), true)
390    }
391    /// `Buffer<T>::write_unchecked()` is logically equivalent to
392    /// `Buffer<T>::release_write()` followed by `&*Buffer<T>::write().unwrap()`.
393    /// Using `write_unchecked()` results in reduced execution time, because
394    /// only one atomic operation is needed (rather than two), and success is
395    /// guaranteed (so there is no need to deal with an `Option<RefMut<T>>`).
396    /// UNSAFE: any existing `RefMut<T>` for this buffer, and any reference
397    /// previously returned by `write_unchecked()`, must be forgotten or dropped
398    /// before calling this function.
399    pub unsafe fn write_unchecked(&self) -> &mut T {
400        &mut *self.get_pointer(self.state.release_and_lock_write(), false)
401    }
402}
403
404unsafe impl<T: Send> Send for Buffer<T> {}
405/// `Buffer<T>` safely inherits Send and Sync from `T`
406/// because of the following guarantees which it enforces:
407///  1. Only one `Ref` associated with this buffer can exist at any time.
408///  2. Only one `RefMut` associated with this buffer can exist at any time.
409///  3. The `Ref` and the `RefMut` will point to different elements of the
410///     buffer.
411///  4. Whenever a `Ref` or `RefMut` is created or dropped,
412///     the buffer state is updated in a single atomic operation.
413unsafe impl<T: Sync> Sync for Buffer<T> {}