1#![cfg_attr(all(not(test), not(feature = "std")), no_std)]
15
16use core::cell::UnsafeCell;
17
18bitfield::bitfield! {
19 #[derive(Copy, Clone, Eq, PartialEq)]
20 #[repr(transparent)]
21 struct BufferStatus(u16);
22 impl Debug;
23 #[inline]
24 swap_pending, set_swap_pending: 0;
25 #[inline]
26 back_locked, set_back_locked: 1;
27 #[inline]
28 front_locked, set_front_locked: 2;
29 #[inline]
30 u8, front_index, set_front_index: 4, 3;
31 #[inline]
32 u8, work_index, set_work_index: 6, 5;
33 #[inline]
34 u8, pending_index, set_pending_index: 8, 7;
35}
36
37impl Default for BufferStatus {
38 #[inline]
39 fn default() -> Self {
40 let mut status = Self(0);
41 status.set_work_index(1);
42 status.set_pending_index(2);
43 status
44 }
45}
46
47#[repr(transparent)]
48#[derive(Debug)]
49struct AtomicStatus(core::sync::atomic::AtomicU16);
50
51impl Default for AtomicStatus {
52 #[inline]
53 fn default() -> Self {
54 Self(core::sync::atomic::AtomicU16::new(
55 BufferStatus::default().0,
56 ))
57 }
58}
59
60impl AtomicStatus {
61 #[inline]
62 fn fetch_update<F>(&self, mut f: F) -> Result<BufferStatus, BufferStatus>
63 where
64 F: FnMut(BufferStatus) -> Option<BufferStatus>,
65 {
66 use core::sync::atomic::Ordering;
67 self.0
68 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |s| {
69 f(BufferStatus(s)).map(|s| s.0)
70 })
71 .map(BufferStatus)
72 .map_err(BufferStatus)
73 }
74}
75
76#[derive(Debug)]
91pub struct AtomicTripleBuffer<T> {
92 buffers: [UnsafeCell<T>; 3],
93 status: AtomicStatus,
94}
95
96unsafe impl<T: Send> Send for AtomicTripleBuffer<T> {}
97unsafe impl<T: Send> Sync for AtomicTripleBuffer<T> {}
98
99impl<T: Default> Default for AtomicTripleBuffer<T> {
100 #[inline]
101 fn default() -> Self {
102 Self {
103 buffers: [Default::default(), Default::default(), Default::default()],
104 status: Default::default(),
105 }
106 }
107}
108
109#[derive(Debug)]
111pub enum TBLockError {
112 AlreadyLocked,
113}
114
115#[cfg(feature = "std")]
116impl std::error::Error for TBLockError {}
117
118impl core::fmt::Display for TBLockError {
119 fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
120 fmt.write_str("buffer already locked")
121 }
122}
123
124impl<T> AtomicTripleBuffer<T> {
125 pub fn new(init: T) -> Self
127 where
128 T: Clone,
129 {
130 Self {
131 buffers: [
132 UnsafeCell::new(init.clone()),
133 UnsafeCell::new(init.clone()),
134 UnsafeCell::new(init),
135 ],
136 status: Default::default(),
137 }
138 }
139 pub fn front_buffer(&self) -> Result<TBFrontGuard<'_, T>, TBLockError> {
144 let mut front_index = 0;
145 self.status
146 .fetch_update(|mut status| {
147 if status.front_locked() {
148 return None;
149 }
150 status.set_front_locked(true);
151 if status.swap_pending() {
152 status.set_swap_pending(false);
153 front_index = status.pending_index();
154 status.set_pending_index(status.front_index());
155 status.set_front_index(front_index);
156 } else {
157 front_index = status.front_index();
158 }
159 Some(status)
160 })
161 .map_err(|_| TBLockError::AlreadyLocked)?;
162 Ok(TBFrontGuard {
163 cell: &self.buffers[front_index as usize],
164 status: &self.status,
165 })
166 }
167 pub fn back_buffers(&self) -> Result<TBBackGuard<'_, T>, TBLockError> {
171 let mut locked_status = BufferStatus::default();
172 self.status
173 .fetch_update(|mut status| {
174 if status.back_locked() {
175 return None;
176 }
177 status.set_back_locked(true);
178 locked_status = status;
179 Some(status)
180 })
181 .map_err(|_| TBLockError::AlreadyLocked)?;
182 Ok(TBBackGuard {
183 bufs: self,
184 locked_status,
185 })
186 }
187}
188
189#[derive(Debug)]
195pub struct TBFrontGuard<'a, T> {
196 cell: &'a UnsafeCell<T>,
197 status: &'a AtomicStatus,
198}
199
200impl<T> core::ops::Deref for TBFrontGuard<'_, T> {
201 type Target = T;
202 #[inline]
203 fn deref(&self) -> &Self::Target {
204 unsafe { &*self.cell.get() }
205 }
206}
207
208impl<T> core::ops::DerefMut for TBFrontGuard<'_, T> {
209 #[inline]
210 fn deref_mut(&mut self) -> &mut Self::Target {
211 unsafe { &mut *self.cell.get() }
212 }
213}
214
215impl<T: core::fmt::Display> core::fmt::Display for TBFrontGuard<'_, T> {
216 #[inline]
217 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
218 (**self).fmt(f)
219 }
220}
221
222impl<T> Drop for TBFrontGuard<'_, T> {
223 fn drop(&mut self) {
224 self.status
225 .fetch_update(|mut status| {
226 status.set_front_locked(false);
227 Some(status)
228 })
229 .ok();
230 }
231}
232
233unsafe impl<T: Send> Send for TBFrontGuard<'_, T> {}
234unsafe impl<T: Send + Sync> Sync for TBFrontGuard<'_, T> {}
235
236#[derive(Debug)]
252pub struct TBBackGuard<'a, T> {
253 bufs: &'a AtomicTripleBuffer<T>,
254 locked_status: BufferStatus,
255}
256
257impl<T> TBBackGuard<'_, T> {
258 pub fn back(&self) -> &T {
260 let index = self.locked_status.work_index() as usize;
261 unsafe { &*self.bufs.buffers[index].get() }
262 }
263 pub fn back_mut(&mut self) -> &mut T {
265 let index = self.locked_status.work_index() as usize;
266 unsafe { &mut *self.bufs.buffers[index].get() }
267 }
268 pub fn pending(&self) -> Option<&T> {
271 if self.locked_status.swap_pending() {
272 return None;
273 }
274 let index = self.locked_status.pending_index() as usize;
275 Some(unsafe { &*self.bufs.buffers[index].get() })
276 }
277 pub fn pending_mut(&mut self) -> Option<&mut T> {
280 if self.locked_status.swap_pending() {
281 return None;
282 }
283 let index = self.locked_status.pending_index() as usize;
284 Some(unsafe { &mut *self.bufs.buffers[index].get() })
285 }
286 pub fn swap(self) {
289 self.bufs
290 .status
291 .fetch_update(|mut status| {
292 status.set_back_locked(false);
293 status.set_swap_pending(true);
294 let pending_index = status.work_index();
295 status.set_work_index(status.pending_index());
296 status.set_pending_index(pending_index);
297 Some(status)
298 })
299 .ok();
300 core::mem::forget(self);
301 }
302}
303
304impl<T: core::fmt::Display> core::fmt::Display for TBBackGuard<'_, T> {
305 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
306 self.back().fmt(f)
307 }
308}
309
310impl<T> Drop for TBBackGuard<'_, T> {
311 fn drop(&mut self) {
312 self.bufs
313 .status
314 .fetch_update(|mut status| {
315 status.set_back_locked(false);
316 Some(status)
317 })
318 .ok();
319 }
320}
321
322unsafe impl<T: Send> Send for TBBackGuard<'_, T> {}
323unsafe impl<T: Send + Sync> Sync for TBBackGuard<'_, T> {}
324
325#[cfg(test)]
326mod tests {
327 use super::AtomicTripleBuffer;
328 use std::sync::Arc;
329
330 #[test]
331 fn basic() {
332 #[derive(Clone, Default)]
333 struct Data {
334 a: i32,
335 b: i32,
336 }
337
338 let buf = Arc::new(AtomicTripleBuffer::<Data>::default());
339
340 let b = buf.clone();
341 let thread = std::thread::spawn(move || {
342 let mut prev = Data::default();
343 loop {
344 let front = b.front_buffer().unwrap();
345 assert!(front.a == front.b);
346 assert!(front.a >= prev.a && front.b >= prev.b);
347 if front.a >= 10000 {
348 break;
349 }
350 prev = (*front).clone();
351 }
352 });
353
354 let mut data = Data::default();
355 for _ in 0..10000 {
356 data.a += 1;
357 data.b += 1;
358 let mut bufs = buf.back_buffers().unwrap();
359 *bufs.back_mut() = data.clone();
360 bufs.swap();
361 }
362
363 thread.join().unwrap();
364 }
365}