atb/
lib.rs

1//! A simple lock-free, thread-safe SPSC triple buffer.
2//!
3//! Supports `no_std`, does no allocations and does not block.
4//!
5//! The buffer can be wrapped in an [`Arc`](std::sync::Arc) and sent to two different threads. The
6//! producer thread can periodically call [`back_buffers`](AtomicTripleBuffer::back_buffers) to
7//! write to the back buffer, and then call [`swap`](TBBackGuard::swap) to commit the back buffer.
8//! Separately, the consumer thread can periodically call
9//! [`front_buffer`](AtomicTripleBuffer::front_buffer) to read from the front buffer.
10//!
11//! Swapping is kept fast by simply updating a single atomic value storing the indexes of the front
12//! and back buffers. No data is moved when swapping buffers.
13
14#![cfg_attr(all(not(test), not(feature = "std")), no_std)]
15
16use core::cell::UnsafeCell;
17
18bitfield::bitfield! {
19    #[derive(Copy, Clone, Eq, PartialEq)]
20    #[repr(transparent)]
21    struct BufferStatus(u16);
22    impl Debug;
23    #[inline]
24    swap_pending, set_swap_pending: 0;
25    #[inline]
26    back_locked, set_back_locked: 1;
27    #[inline]
28    front_locked, set_front_locked: 2;
29    #[inline]
30    u8, front_index, set_front_index: 4, 3;
31    #[inline]
32    u8, work_index, set_work_index: 6, 5;
33    #[inline]
34    u8, pending_index, set_pending_index: 8, 7;
35}
36
37impl Default for BufferStatus {
38    #[inline]
39    fn default() -> Self {
40        let mut status = Self(0);
41        status.set_work_index(1);
42        status.set_pending_index(2);
43        status
44    }
45}
46
47#[repr(transparent)]
48#[derive(Debug)]
49struct AtomicStatus(core::sync::atomic::AtomicU16);
50
51impl Default for AtomicStatus {
52    #[inline]
53    fn default() -> Self {
54        Self(core::sync::atomic::AtomicU16::new(
55            BufferStatus::default().0,
56        ))
57    }
58}
59
60impl AtomicStatus {
61    #[inline]
62    fn fetch_update<F>(&self, mut f: F) -> Result<BufferStatus, BufferStatus>
63    where
64        F: FnMut(BufferStatus) -> Option<BufferStatus>,
65    {
66        use core::sync::atomic::Ordering;
67        self.0
68            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |s| {
69                f(BufferStatus(s)).map(|s| s.0)
70            })
71            .map(BufferStatus)
72            .map_err(BufferStatus)
73    }
74}
75
76/// A container holding three instances of `T` that can be used as a queue.
77///
78/// The three instances correspond to three buffers: front buffer, back buffer, and pending
79/// buffer. When the back buffer is written, it can be "committed" to swap it with the
80/// pending buffer. The next time an attempt is made to read the front buffer, it will be swapped
81/// with any pending buffer before reading.
82///
83/// The front buffer can be locked by one thread at a time and the back buffer can be locked by
84/// another thread. This restriction ensures only one thread has access to each buffer at time,
85/// making it safe to access the `AtomicTripleBuffer` from multiple threads without `T` being
86/// [`Sync`](core::marker::Sync).
87///
88/// Locking and swapping are both accomplished by adjusting a single lock-free flag holding the
89/// indexes of the buffers. No copying is done when swapping buffers.
90#[derive(Debug)]
91pub struct AtomicTripleBuffer<T> {
92    buffers: [UnsafeCell<T>; 3],
93    status: AtomicStatus,
94}
95
96unsafe impl<T: Send> Send for AtomicTripleBuffer<T> {}
97unsafe impl<T: Send> Sync for AtomicTripleBuffer<T> {}
98
99impl<T: Default> Default for AtomicTripleBuffer<T> {
100    #[inline]
101    fn default() -> Self {
102        Self {
103            buffers: [Default::default(), Default::default(), Default::default()],
104            status: Default::default(),
105        }
106    }
107}
108
109/// An error that can be returned when trying to lock a buffer.
110#[derive(Debug)]
111pub enum TBLockError {
112    AlreadyLocked,
113}
114
115#[cfg(feature = "std")]
116impl std::error::Error for TBLockError {}
117
118impl core::fmt::Display for TBLockError {
119    fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
120        fmt.write_str("buffer already locked")
121    }
122}
123
124impl<T> AtomicTripleBuffer<T> {
125    /// Creates a new `AtomicTripleBuffer` with all buffers set to `init`.
126    pub fn new(init: T) -> Self
127    where
128        T: Clone,
129    {
130        Self {
131            buffers: [
132                UnsafeCell::new(init.clone()),
133                UnsafeCell::new(init.clone()),
134                UnsafeCell::new(init),
135            ],
136            status: Default::default(),
137        }
138    }
139    /// Tries to lock the front buffer for access. Returns [`TBLockError::AlreadyLocked`] if the
140    /// front buffer is already locked by another thread. If the pending buffer is locked, this
141    /// method will atomically swap the pending and front buffers and unlock the pending buffer at
142    /// the same time the front buffer is locked.
143    pub fn front_buffer(&self) -> Result<TBFrontGuard<'_, T>, TBLockError> {
144        let mut front_index = 0;
145        self.status
146            .fetch_update(|mut status| {
147                if status.front_locked() {
148                    return None;
149                }
150                status.set_front_locked(true);
151                if status.swap_pending() {
152                    status.set_swap_pending(false);
153                    front_index = status.pending_index();
154                    status.set_pending_index(status.front_index());
155                    status.set_front_index(front_index);
156                } else {
157                    front_index = status.front_index();
158                }
159                Some(status)
160            })
161            .map_err(|_| TBLockError::AlreadyLocked)?;
162        Ok(TBFrontGuard {
163            cell: &self.buffers[front_index as usize],
164            status: &self.status,
165        })
166    }
167    /// Tries to lock the back buffer for access. Returns [`TBLockError::AlreadyLocked`] if the
168    /// back buffer is already locked by another thread. Also allows access to the pending buffer
169    /// if a frame is not pending.
170    pub fn back_buffers(&self) -> Result<TBBackGuard<'_, T>, TBLockError> {
171        let mut locked_status = BufferStatus::default();
172        self.status
173            .fetch_update(|mut status| {
174                if status.back_locked() {
175                    return None;
176                }
177                status.set_back_locked(true);
178                locked_status = status;
179                Some(status)
180            })
181            .map_err(|_| TBLockError::AlreadyLocked)?;
182        Ok(TBBackGuard {
183            bufs: self,
184            locked_status,
185        })
186    }
187}
188
189/// An RAII guard holding a lock on the front buffer.
190///
191/// When the structure is dropped (falls out of scope), the front buffer will be unlocked. The
192/// front buffer data can be accessed via the [`Deref`](core::ops::Deref) and
193/// [`DerefMut`](core::ops::DerefMut) implementations.
194#[derive(Debug)]
195pub struct TBFrontGuard<'a, T> {
196    cell: &'a UnsafeCell<T>,
197    status: &'a AtomicStatus,
198}
199
200impl<T> core::ops::Deref for TBFrontGuard<'_, T> {
201    type Target = T;
202    #[inline]
203    fn deref(&self) -> &Self::Target {
204        unsafe { &*self.cell.get() }
205    }
206}
207
208impl<T> core::ops::DerefMut for TBFrontGuard<'_, T> {
209    #[inline]
210    fn deref_mut(&mut self) -> &mut Self::Target {
211        unsafe { &mut *self.cell.get() }
212    }
213}
214
215impl<T: core::fmt::Display> core::fmt::Display for TBFrontGuard<'_, T> {
216    #[inline]
217    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
218        (**self).fmt(f)
219    }
220}
221
222impl<T> Drop for TBFrontGuard<'_, T> {
223    fn drop(&mut self) {
224        self.status
225            .fetch_update(|mut status| {
226                status.set_front_locked(false);
227                Some(status)
228            })
229            .ok();
230    }
231}
232
233unsafe impl<T: Send> Send for TBFrontGuard<'_, T> {}
234unsafe impl<T: Send + Sync> Sync for TBFrontGuard<'_, T> {}
235
236/// An RAII guard holding a lock on the back buffer.
237///
238/// The back buffer can be accessed via the [`back`](TBBackGuard::back) and
239/// [`back_mut`](TBBackGuard::back_mut) methods and committed with the [`swap`](TBBackGuard::swap)
240/// method. After swapping, the back buffer will be in an indeterminate state and may be either the
241/// previous front buffer or previous pending buffer. Usually callers should ensure the back buffer
242/// is fully cleared or rewritten after acquiring the lock and before calling
243/// [`swap`](TBBackGuard::swap). Dropping the guard structure without calling
244/// [`swap`](TBBackGuard::swap) will unlock the back buffer, leaving it in the same state for the
245/// next time it's locked.
246///
247/// The [`pending`](TBBackGuard::pending) and [`pending_mut`](TBBackGuard::pending_mut) methods can
248/// also be used to access the pending buffer if it was not locked at the time this structure was
249/// created. The primary reason to access the pending is to free data in the pending buffer without
250/// having to wait for the next swap.
251#[derive(Debug)]
252pub struct TBBackGuard<'a, T> {
253    bufs: &'a AtomicTripleBuffer<T>,
254    locked_status: BufferStatus,
255}
256
257impl<T> TBBackGuard<'_, T> {
258    /// Returns a reference to the back buffer.
259    pub fn back(&self) -> &T {
260        let index = self.locked_status.work_index() as usize;
261        unsafe { &*self.bufs.buffers[index].get() }
262    }
263    /// Returns a mutable reference to the back buffer.
264    pub fn back_mut(&mut self) -> &mut T {
265        let index = self.locked_status.work_index() as usize;
266        unsafe { &mut *self.bufs.buffers[index].get() }
267    }
268    /// Returns a reference to the pending buffer, or `None` if the pending buffer was locked at
269    /// the time the structure was created.
270    pub fn pending(&self) -> Option<&T> {
271        if self.locked_status.swap_pending() {
272            return None;
273        }
274        let index = self.locked_status.pending_index() as usize;
275        Some(unsafe { &*self.bufs.buffers[index].get() })
276    }
277    /// Returns a mutable reference to the pending buffer, or `None` if the pending buffer was
278    /// locked at the time the structure was created.
279    pub fn pending_mut(&mut self) -> Option<&mut T> {
280        if self.locked_status.swap_pending() {
281            return None;
282        }
283        let index = self.locked_status.pending_index() as usize;
284        Some(unsafe { &mut *self.bufs.buffers[index].get() })
285    }
286    /// Swaps the back buffer with the pending buffer and then locks the pending buffer if it was
287    /// not already locked.
288    pub fn swap(self) {
289        self.bufs
290            .status
291            .fetch_update(|mut status| {
292                status.set_back_locked(false);
293                status.set_swap_pending(true);
294                let pending_index = status.work_index();
295                status.set_work_index(status.pending_index());
296                status.set_pending_index(pending_index);
297                Some(status)
298            })
299            .ok();
300        core::mem::forget(self);
301    }
302}
303
304impl<T: core::fmt::Display> core::fmt::Display for TBBackGuard<'_, T> {
305    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
306        self.back().fmt(f)
307    }
308}
309
310impl<T> Drop for TBBackGuard<'_, T> {
311    fn drop(&mut self) {
312        self.bufs
313            .status
314            .fetch_update(|mut status| {
315                status.set_back_locked(false);
316                Some(status)
317            })
318            .ok();
319    }
320}
321
322unsafe impl<T: Send> Send for TBBackGuard<'_, T> {}
323unsafe impl<T: Send + Sync> Sync for TBBackGuard<'_, T> {}
324
325#[cfg(test)]
326mod tests {
327    use super::AtomicTripleBuffer;
328    use std::sync::Arc;
329
330    #[test]
331    fn basic() {
332        #[derive(Clone, Default)]
333        struct Data {
334            a: i32,
335            b: i32,
336        }
337
338        let buf = Arc::new(AtomicTripleBuffer::<Data>::default());
339
340        let b = buf.clone();
341        let thread = std::thread::spawn(move || {
342            let mut prev = Data::default();
343            loop {
344                let front = b.front_buffer().unwrap();
345                assert!(front.a == front.b);
346                assert!(front.a >= prev.a && front.b >= prev.b);
347                if front.a >= 10000 {
348                    break;
349                }
350                prev = (*front).clone();
351            }
352        });
353
354        let mut data = Data::default();
355        for _ in 0..10000 {
356            data.a += 1;
357            data.b += 1;
358            let mut bufs = buf.back_buffers().unwrap();
359            *bufs.back_mut() = data.clone();
360            bufs.swap();
361        }
362
363        thread.join().unwrap();
364    }
365}