atomic_pingpong/lib.rs
1//! Lightweight ping-pong buffer intended for no_std targets.
2//!
3//! A ping-pong buffer is a two-element buffer which allows simultaneous access
4//! by a single producer and a single consumer. One element is reserved for
5//! writing by the producer, and the other element is reserved for reading by
6//! the consumer. When writing and reading are finished, the roles of the two
7//! elements are swapped (i.e. the one which was written will be next to be
8//! read, and the one which was read will be next to be overwritten). This
9//! approach avoids the need for memory copies, which improves performance when
10//! the element size is large.
11//!
12//! The ping-pong buffer is specifically designed to allow simultaneous reading
13//! and writing. However, the roles of the two elements can only be safely
14//! swapped when neither reading or writing is in progress. It is the user's
15//! responsibility to ensure that the timing of reads and writes allows for this
16//! to happen. If reads and writes are interleaved such that one or the other
17//! is always in progress, then the roles of the buffer elements will never be
18//! able to swap, and the reader will continue to read an old value rather than
19//! the new values which are being written.
20//!
21//! A reference for reading is acquired by calling `Buffer<T>::read()`, and a
22//! mutable reference for writing is acquired by calling `Buffer<T>::write()`.
23//! The types returned are smart pointers (`Ref<T>` and `RefMut<T>`,
24//! respectively), which automatically update the state of the ping-pong buffer
25//! when they are dropped. Attempting to acquire a second reference for reading
26//! or writing will fail if the first reference of that type has not been dropped.
27//! To opt out of automatic reference management, a set of unsafe access functions
28//! are available: `read_unchecked()`, `write_unchecked()`, `release_read()`, and
29//! `release_write()`. These functions provide reduced runtime overhead but, of
30//! course, care is required to use them safely.
31//!
32//! Ordinarily, calls to `read()` and `write()` are as permissive as possible:
33//! `read()` succeeds unless reading is already in progress, and `write()`
34//! succeeds unless writing is already in progress. Thus, depending on the
35//! timing of `read()` and `write()` calls, certain data which is written may
36//! never be read, and other data which is written may be read multiple times.
37//! (This is an important distinction between a ping-pong buffer and a FIFO
38//! ring buffer.) Alternative behavior is possible using the `read_once()`
39//! function, which only returns a `Ref<T>` if it points to data which has not
40//! yet been read, and the `write_no_discard()` function, which only returns a
41//! `RefMut<T>` if the buffer does not currently contain unread data.
42//!
43//! The memory footprint of a `Buffer<T>` is two of `T` plus one additional byte
44//! (an `AtomicU8`) which is used to synchronize access by the producer and
45//! consumer. The runtime overhead from this implementation is less than about
46//! twenty instructions to acquire or release a reference to the ping-pong
47//! buffer (assuming function inlining is enabled). However, this crate can
48//! only be used on targets which include atomic compare/swap in their
49//! instruction sets.
50
51#![no_std]
52
53use core::cell::UnsafeCell;
54use core::mem::MaybeUninit;
55use core::sync::atomic;
56
57// Basically all of the tricky synchronization logic for the ping-pong buffer
58// lives in the BufferState implementation. The buffer state is a bitmask
59// stored in an AtomicU8, rather than booleans or enums, in order to permit
60// atomic updtes to multiple flags at once. The custom BufferState type
61// provides a convenient place for the associated functions and constants.
62struct BufferState(atomic::AtomicU8);
63
64/// A `Buffer<T>` consists of two copies of `T` plus one additional byte of
65/// state.
66pub struct Buffer<T> {
67 ping: UnsafeCell<T>,
68 pong: UnsafeCell<T>,
69 state: BufferState,
70}
71
72/// Smart pointer for reading from a `Buffer<T>`.
73/// Updates the buffer's state when dropped.
74pub struct Ref<'a, T> {
75 ptr: &'a T,
76 state: &'a BufferState,
77}
78
79/// Smart pointer for writing to a `Buffer<T>`.
80/// Updates the buffer's state when dropped.
81pub struct RefMut<'a, T> {
82 ptr: &'a mut T,
83 state: &'a BufferState,
84}
85
86impl BufferState {
87 // Bits of the bitmask:
88 const LOCK_READ: u8 = 0b0000_0001;
89 const LOCK_WRITE: u8 = 0b0000_0010;
90 const MODE_IS_FLIPPED: u8 = 0b0000_0100;
91 const WANT_MODE_CHANGE: u8 = 0b0000_1000;
92 const NEW_DATA_READY: u8 = 0b0001_0000;
93
94 const fn new() -> Self {
95 Self(atomic::AtomicU8::new(0))
96 }
97 /// If `condition()` is true, atomically update the state byte with
98 /// `action()` (using "Acquire" ordering) and return the current mode.
99 /// If `condition()` is false, return None without changing the state byte.
100 fn lock(&self, condition: fn(u8) -> bool, action: fn(u8) -> u8) -> Option<bool> {
101 let mut new_flags = None::<u8>;
102 let _ = self.0.fetch_update(
103 atomic::Ordering::Acquire,
104 atomic::Ordering::Relaxed,
105 |flags| {
106 if condition(flags) {
107 new_flags = Some(action(flags));
108 }
109 new_flags
110 },
111 );
112 new_flags.map(|f| f & Self::MODE_IS_FLIPPED != 0)
113 }
114 fn lock_read(&self, allow_repeated: bool) -> Option<bool> {
115 self.lock(
116 if allow_repeated {
117 // allow reading the same data multiple times
118 |flags| flags & Self::LOCK_READ == 0
119 } else {
120 // only lock for reading if there is new unread data
121 |flags| flags & (Self::LOCK_READ | Self::NEW_DATA_READY) == Self::NEW_DATA_READY
122 },
123 |flags| (flags | Self::LOCK_READ) & !Self::NEW_DATA_READY,
124 )
125 }
126 fn lock_write(&self, allow_repeated: bool) -> Option<bool> {
127 self.lock(
128 if allow_repeated {
129 // allow overwriting data which has not yet been read
130 |flags| flags & Self::LOCK_WRITE == 0
131 } else {
132 // only lock for writing if there is not any unread data
133 |flags| flags & (Self::LOCK_WRITE | Self::NEW_DATA_READY) == 0
134 },
135 |flags| flags | Self::LOCK_WRITE,
136 )
137 }
138 /// Atomically update the state byte with `action()`
139 /// (using "Release" ordering).
140 fn release(&self, action: fn(u8) -> u8) {
141 let _ = self.0.fetch_update(
142 atomic::Ordering::Release,
143 atomic::Ordering::Relaxed,
144 |flags| Some(action(flags)),
145 ); // always Ok because the closure always returns Some
146 }
147 fn release_read(&self) {
148 self.release(|mut flags| {
149 flags &= !Self::LOCK_READ;
150 if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
151 flags &= !Self::WANT_MODE_CHANGE;
152 flags ^= Self::MODE_IS_FLIPPED;
153 }
154 flags
155 })
156 }
157 fn release_write(&self) {
158 self.release(|mut flags| {
159 flags &= !Self::LOCK_WRITE;
160 flags |= Self::NEW_DATA_READY;
161 if flags & Self::LOCK_READ == 0 {
162 flags &= !Self::WANT_MODE_CHANGE;
163 flags ^= Self::MODE_IS_FLIPPED;
164 } else {
165 flags |= Self::WANT_MODE_CHANGE;
166 }
167 flags
168 })
169 }
170 /// Atomically update the state byte with `action()`
171 /// (using "AcqRel" ordering) and return the current mode.
172 fn release_and_lock(&self, action: fn(u8) -> u8) -> bool {
173 let mut new_flags = 0u8;
174 let _ = self.0.fetch_update(
175 atomic::Ordering::AcqRel,
176 atomic::Ordering::Relaxed,
177 |flags| {
178 new_flags = action(flags);
179 Some(new_flags)
180 },
181 ); // always Ok because the closure always returns Some
182 new_flags & Self::MODE_IS_FLIPPED != 0
183 }
184 fn release_and_lock_read(&self) -> bool {
185 self.release_and_lock(|mut flags| {
186 flags |= Self::LOCK_READ;
187 flags &= !Self::NEW_DATA_READY;
188 if flags & (Self::LOCK_WRITE | Self::WANT_MODE_CHANGE) == Self::WANT_MODE_CHANGE {
189 flags &= !Self::WANT_MODE_CHANGE;
190 flags ^= Self::MODE_IS_FLIPPED;
191 }
192 flags
193 })
194 }
195 fn release_and_lock_write(&self) -> bool {
196 self.release_and_lock(|mut flags| {
197 if flags & Self::LOCK_WRITE != 0 {
198 flags |= Self::NEW_DATA_READY;
199 if flags & Self::LOCK_READ == 0 {
200 flags &= !Self::WANT_MODE_CHANGE;
201 flags ^= Self::MODE_IS_FLIPPED;
202 } else {
203 flags |= Self::WANT_MODE_CHANGE;
204 }
205 } else {
206 flags |= Self::LOCK_WRITE;
207 }
208 flags
209 })
210 }
211}
212
213impl<'a, T> Ref<'a, T> {
214 fn new(buf: &'a Buffer<T>, allow_repeated: bool) -> Option<Self> {
215 let mode = buf.state.lock_read(allow_repeated)?;
216 // If we get here, lock_read() succeeded, so it's safe to access the UnsafeCell
217 // which is currently designated for reading.
218 Some(Ref {
219 ptr: unsafe { &*buf.get_pointer(mode, true) },
220 state: &buf.state,
221 })
222 }
223}
224
225impl<'a, T> RefMut<'a, T> {
226 fn new(buf: &'a Buffer<T>, allow_repeated: bool) -> Option<Self> {
227 let mode = buf.state.lock_write(allow_repeated)?;
228 // If we get here, lock_write() succeeded, so it's safe to access the UnsafeCell
229 // which is currently designated for writing.
230 Some(RefMut {
231 ptr: unsafe { &mut *buf.get_pointer(mode, false) },
232 state: &buf.state,
233 })
234 }
235}
236
237impl<'a, T> Drop for Ref<'a, T> {
238 /// When a `Ref<'a, T>` is dropped, the state of the corresponding
239 /// `Buffer<T>` is automatically updated.
240 fn drop(&mut self) {
241 self.state.release_read();
242 }
243}
244
245impl<'a, T> Drop for RefMut<'a, T> {
246 /// When a `RefMut<'a, T>` is dropped, the state of the corresponding
247 /// `Buffer<T>` is automatically updated.
248 fn drop(&mut self) {
249 self.state.release_write();
250 }
251}
252
253impl<'a, T> core::ops::Deref for Ref<'a, T> {
254 /// `Ref<'a, T>` dereferences to a `T` element of the `Buffer<T>`.
255 type Target = T;
256 fn deref(&self) -> &T {
257 self.ptr
258 }
259}
260
261impl<'a, T> core::ops::Deref for RefMut<'a, T> {
262 /// `RefMut<'a, T>` dereferences to a `T` element of the `Buffer<T>`.
263 type Target = T;
264 /// Dereferences the value.
265 /// (Required in order to support `deref_mut`;
266 /// not likely to be useful on its own.)
267 fn deref(&self) -> &T {
268 self.ptr
269 }
270}
271
272impl<'a, T> core::ops::DerefMut for RefMut<'a, T> {
273 fn deref_mut(&mut self) -> &mut T {
274 self.ptr
275 }
276}
277
278impl<T: Copy> Buffer<T> {
279 /// Returns a new ping-pong buffer with the elements initialized to the
280 /// specified value.
281 pub const fn new(value: T) -> Self {
282 Buffer {
283 ping: UnsafeCell::new(value),
284 pong: UnsafeCell::new(value),
285 state: BufferState::new(),
286 }
287 }
288}
289
290impl<T: Default> Buffer<T> {
291 /// Returns a new ping-pong buffer with the elements initialized to their
292 /// default value.
293 pub fn default() -> Self {
294 Buffer {
295 ping: UnsafeCell::default(),
296 pong: UnsafeCell::default(),
297 state: BufferState::new(),
298 }
299 }
300}
301
302impl<T> Buffer<MaybeUninit<T>> {
303 /// Returns a new ping-pong buffer with uninitialized elements.
304 pub const fn uninit() -> Self {
305 Buffer {
306 ping: UnsafeCell::new(MaybeUninit::uninit()),
307 pong: UnsafeCell::new(MaybeUninit::uninit()),
308 state: BufferState::new(),
309 }
310 }
311}
312
313impl<T> Buffer<T> {
314 const fn get_pointer(&self, state: bool, read: bool) -> *mut T {
315 // state = false => read ping and write pong
316 // state = true => read pong and write ping
317 (if state ^ read { &self.ping } else { &self.pong }).get()
318 }
319 /// Returns a `Ref<T>` smart pointer providing read-only access to the
320 /// ping-pong buffer, or `None` if the `Ref<T>` from a previous call has
321 /// not been dropped yet. If a call to `write` previously finished and
322 /// the ping-pong buffer was able to swap, the `T` element pointed to by
323 /// the reference will be a value that was previously written.
324 /// Otherwise, the `T` element will have its specified initial value based
325 /// on the function which was used to construct the ping-pong buffer.
326 pub fn read(&self) -> Option<Ref<T>> {
327 Ref::new(&self, true)
328 }
329 /// Ordinarily, the `read()` function allows the same data to be read
330 /// multiple times, and it allows the initial value to be read prior to
331 /// any calls to `write()`. In contrast, `read_once()` only returns a
332 /// `Ref<T>` if it points to new data which has been written into the
333 /// buffer and not yet read. Returns `None` if new data is not available
334 /// to read or if a previous `Ref<T>` has not yet been dropped.
335 pub fn read_once(&self) -> Option<Ref<T>> {
336 Ref::new(&self, false)
337 }
338 /// Returns a `RefMut<T>` smart pointer providing mutable access to the
339 /// ping-pong buffer, or `None` if the `RefMut<T>` from a previous call
340 /// has not been dropped yet. Due to the nature of the ping-pong buffer,
341 /// the `T` element pointed to by the reference may have an arbitrary
342 /// starting value prior to being overwritten by the caller.
343 pub fn write(&self) -> Option<RefMut<T>> {
344 RefMut::new(&self, true)
345 }
346 /// Ordinarily, the `write()` function allows an arbitrary number of
347 /// sequential writes, even if data which was previously written (and
348 /// will now be overwritten) has never been read. In contrast,
349 /// `write_no_discard()` only returns a `RefMut<T>` if no unread
350 /// data will be overwritten by this write. Returns `None` if the buffer
351 /// already contains unread data or if a previous `RefMut<T>` has not
352 /// yet been dropped.
353 pub fn write_no_discard(&self) -> Option<RefMut<T>> {
354 RefMut::new(&self, false)
355 }
356 /// When the ping-pong buffer is used safely, reading is
357 /// automatically marked as complete when the `Ref<T>` is dropped.
358 /// This mechanism may be circumvented by forgetting a `Ref<T>` (so
359 /// that its destructor doesn't run), or by acquiring a raw pointer
360 /// from `read_unchecked()`. In these cases, `release_read()` should be
361 /// called when there will be no more access to the data being read.
362 /// UNSAFE: any existing `Ref<T>` for this buffer, and any reference
363 /// previously returned by `read_unchecked()`, must be forgotten or dropped
364 /// before calling this function.
365 pub unsafe fn release_read(&self) {
366 self.state.release_read();
367 }
368 /// When the ping-pong buffer is used safely, writing is
369 /// automatically marked as complete when the `RefMut<T>` is dropped.
370 /// This mechanism may be circumvented by forgetting a `RefMut<T>` (so
371 /// that its destructor doesn't run), or by acquiring a raw pointer
372 /// from `write_unchecked()`. In these cases, `release_write()` should be
373 /// called when there will be no more access to the data being written.
374 /// UNSAFE: any existing `RefMut<T>` for this buffer, and any reference
375 /// previously returned by `write_unchecked()`, must be forgotten or dropped
376 /// before calling this function.
377 pub unsafe fn release_write(&self) {
378 self.state.release_write();
379 }
380 /// `Buffer<T>::read_unchecked()` is logically equivalent to
381 /// `Buffer<T>::release_read()` followed by `&*Buffer<T>::read().unwrap()`.
382 /// Using `read_unchecked()` results in reduced execution time, because
383 /// only one atomic operation is needed (rather than two), and success is
384 /// guaranteed (so there is no need to deal with an `Option<Ref<T>>`).
385 /// UNSAFE: any existing `Ref<T>` for this buffer, and any reference
386 /// previously returned by `read_unchecked()`, must be forgotten or dropped
387 /// before calling this function.
388 pub unsafe fn read_unchecked(&self) -> &T {
389 &*self.get_pointer(self.state.release_and_lock_read(), true)
390 }
391 /// `Buffer<T>::write_unchecked()` is logically equivalent to
392 /// `Buffer<T>::release_write()` followed by `&*Buffer<T>::write().unwrap()`.
393 /// Using `write_unchecked()` results in reduced execution time, because
394 /// only one atomic operation is needed (rather than two), and success is
395 /// guaranteed (so there is no need to deal with an `Option<RefMut<T>>`).
396 /// UNSAFE: any existing `RefMut<T>` for this buffer, and any reference
397 /// previously returned by `write_unchecked()`, must be forgotten or dropped
398 /// before calling this function.
399 pub unsafe fn write_unchecked(&self) -> &mut T {
400 &mut *self.get_pointer(self.state.release_and_lock_write(), false)
401 }
402}
403
404unsafe impl<T: Send> Send for Buffer<T> {}
405/// `Buffer<T>` safely inherits Send and Sync from `T`
406/// because of the following guarantees which it enforces:
407/// 1. Only one `Ref` associated with this buffer can exist at any time.
408/// 2. Only one `RefMut` associated with this buffer can exist at any time.
409/// 3. The `Ref` and the `RefMut` will point to different elements of the
410/// buffer.
411/// 4. Whenever a `Ref` or `RefMut` is created or dropped,
412/// the buffer state is updated in a single atomic operation.
413unsafe impl<T: Sync> Sync for Buffer<T> {}