waitfree_sync/
triple_buffer.rs

1//! Wait-free single-producer single-consumer (SPSC) triple buffer to share data between two threads.
2//!
3//! # Example
4//! ```rust
5//! use waitfree_sync::triple_buffer;
6//!
7//! let (mut wr, mut rd) = triple_buffer::triple_buffer();
8//! wr.write(42);
9//! assert_eq!(wr.try_read(), Some(42));
10//! assert_eq!(rd.try_read(), Some(42));
11//! ```
12//!
13//!
14
15use crate::import::{Arc, AtomicUsize, Ordering, UnsafeCell};
16use crossbeam_utils::CachePadded;
17
18const NEW_DATA_FLAG: usize = 0b100;
19const INDEX_MASK: usize = 0b011;
20
21#[derive(Debug)]
22struct Shared<T: Sized> {
23    mem: [UnsafeCell<Option<T>>; 3],
24    latest_free: CachePadded<AtomicUsize>,
25}
26
27impl<T> Shared<T> {
28    fn new() -> Self {
29        Shared {
30            mem: [
31                UnsafeCell::new(None),
32                UnsafeCell::new(None),
33                UnsafeCell::new(None),
34            ],
35            latest_free: CachePadded::new(0.into()),
36        }
37    }
38}
39
40/// Create a new wait-free riple buffer.
41/// # Example
42/// ```rust
43/// use waitfree_sync::triple_buffer;
44///
45/// //               Data type ──╮   ╭─ Capacity
46/// let (wr, rd) = triple_buffer::triple_buffer::<u64>();
47/// ```
48pub fn triple_buffer<T>() -> (Writer<T>, Reader<T>) {
49    let chan = Arc::new(Shared::new());
50
51    let w = Writer::new(chan.clone());
52    let r = Reader::new(chan);
53    (w, r)
54}
55
56/// The reading side of the [triple_buffer].
57#[derive(Debug)]
58pub struct Reader<T> {
59    shared: Arc<Shared<T>>,
60    read_idx: usize,
61}
62unsafe impl<T: Send> Send for Reader<T> {}
63unsafe impl<T: Send> Sync for Reader<T> {}
64
65impl<T> Reader<T> {
66    fn new(raw_mem: Arc<Shared<T>>) -> Self {
67        Reader {
68            shared: raw_mem,
69            read_idx: 1,
70        }
71    }
72
73    /// Reads the latest available value.
74    /// Returns [None] if the [Writer] has not written anything yet.
75    #[inline]
76    pub fn try_read(&mut self) -> Option<T>
77    where
78        T: Clone,
79    {
80        let has_new_data = self.shared.latest_free.load(Ordering::Acquire) & NEW_DATA_FLAG > 0;
81        if has_new_data {
82            self.read_idx = self
83                .shared
84                .latest_free
85                .swap(self.read_idx, Ordering::AcqRel)
86                & INDEX_MASK;
87        }
88
89        #[cfg(loom)]
90        let val = unsafe { self.shared.mem[self.read_idx].get().deref() }.clone();
91        #[cfg(not(loom))]
92        let val = unsafe { &*self.shared.mem[self.read_idx].get() }.clone();
93        val
94    }
95}
96
97/// The writing side of the [triple_buffer].
98#[derive(Debug)]
99pub struct Writer<T> {
100    shared: Arc<Shared<T>>,
101    write_idx: usize,
102    last_written: Option<usize>,
103}
104unsafe impl<T: Send> Send for Writer<T> {}
105unsafe impl<T: Send> Sync for Writer<T> {}
106
107impl<T> Writer<T> {
108    fn new(raw_mem: Arc<Shared<T>>) -> Self {
109        Writer {
110            shared: raw_mem,
111            write_idx: 2,
112            last_written: None,
113        }
114    }
115
116    /// Reads the latest available value.
117    /// Returns [None] if the [Writer] has not written anything yet.
118    #[inline]
119    pub fn try_read(&mut self) -> Option<T>
120    where
121        T: Clone,
122    {
123        let last_written = self.last_written?;
124
125        #[cfg(loom)]
126        let val = unsafe { self.shared.mem[last_written].get().deref() }.clone();
127        #[cfg(not(loom))]
128        let val = unsafe { &*self.shared.mem[last_written].get() }.clone();
129        val
130    }
131
132    /// Writes an new value.
133    #[inline]
134    pub fn write(&mut self, data: T) {
135        #[cfg(loom)]
136        unsafe {
137            self.shared.mem[self.write_idx & INDEX_MASK]
138                .get_mut()
139                .with(|ptr| {
140                    let _ = ptr.replace(Some(data));
141                });
142        }
143        #[cfg(not(loom))]
144        // Drop old value and write new one
145        let _ = unsafe {
146            self.shared.mem[self.write_idx & INDEX_MASK]
147                .get()
148                .replace(Some(data))
149        };
150
151        // Store index
152        self.last_written = Some(self.write_idx & INDEX_MASK);
153        self.write_idx = self
154            .shared
155            .latest_free
156            .swap(self.write_idx | NEW_DATA_FLAG, Ordering::AcqRel);
157    }
158}
159
160#[cfg(test)]
161mod test {
162    use super::*;
163
164    #[test]
165    fn smoke() {
166        let (mut w, mut r) = triple_buffer();
167        w.write(vec![0; 15]);
168
169        assert_eq!(w.try_read(), Some(vec![0; 15]));
170        assert_eq!(r.try_read(), Some(vec![0; 15]));
171    }
172
173    #[test]
174    fn test_read_none() {
175        let (mut w, mut r) = triple_buffer();
176        assert_eq!(r.try_read(), None);
177        w.write(vec![0; 15]);
178        assert_eq!(r.try_read(), Some(vec![0; 15]));
179    }
180}