waitfree_sync/
triple_buffer.rs1use crate::import::{Arc, AtomicUsize, Ordering, UnsafeCell};
16use crossbeam_utils::CachePadded;
17
18const NEW_DATA_FLAG: usize = 0b100;
19const INDEX_MASK: usize = 0b011;
20
21#[derive(Debug)]
22struct Shared<T: Sized> {
23 mem: [UnsafeCell<Option<T>>; 3],
24 latest_free: CachePadded<AtomicUsize>,
25}
26
27impl<T> Shared<T> {
28 fn new() -> Self {
29 Shared {
30 mem: [
31 UnsafeCell::new(None),
32 UnsafeCell::new(None),
33 UnsafeCell::new(None),
34 ],
35 latest_free: CachePadded::new(0.into()),
36 }
37 }
38}
39
40pub fn triple_buffer<T>() -> (Writer<T>, Reader<T>) {
49 let chan = Arc::new(Shared::new());
50
51 let w = Writer::new(chan.clone());
52 let r = Reader::new(chan);
53 (w, r)
54}
55
56#[derive(Debug)]
58pub struct Reader<T> {
59 shared: Arc<Shared<T>>,
60 read_idx: usize,
61}
62unsafe impl<T: Send> Send for Reader<T> {}
63unsafe impl<T: Send> Sync for Reader<T> {}
64
65impl<T> Reader<T> {
66 fn new(raw_mem: Arc<Shared<T>>) -> Self {
67 Reader {
68 shared: raw_mem,
69 read_idx: 1,
70 }
71 }
72
73 #[inline]
76 pub fn try_read(&mut self) -> Option<T>
77 where
78 T: Clone,
79 {
80 let has_new_data = self.shared.latest_free.load(Ordering::Acquire) & NEW_DATA_FLAG > 0;
81 if has_new_data {
82 self.read_idx = self
83 .shared
84 .latest_free
85 .swap(self.read_idx, Ordering::AcqRel)
86 & INDEX_MASK;
87 }
88
89 #[cfg(loom)]
90 let val = unsafe { self.shared.mem[self.read_idx].get().deref() }.clone();
91 #[cfg(not(loom))]
92 let val = unsafe { &*self.shared.mem[self.read_idx].get() }.clone();
93 val
94 }
95}
96
97#[derive(Debug)]
99pub struct Writer<T> {
100 shared: Arc<Shared<T>>,
101 write_idx: usize,
102 last_written: Option<usize>,
103}
104unsafe impl<T: Send> Send for Writer<T> {}
105unsafe impl<T: Send> Sync for Writer<T> {}
106
107impl<T> Writer<T> {
108 fn new(raw_mem: Arc<Shared<T>>) -> Self {
109 Writer {
110 shared: raw_mem,
111 write_idx: 2,
112 last_written: None,
113 }
114 }
115
116 #[inline]
119 pub fn try_read(&mut self) -> Option<T>
120 where
121 T: Clone,
122 {
123 let last_written = self.last_written?;
124
125 #[cfg(loom)]
126 let val = unsafe { self.shared.mem[last_written].get().deref() }.clone();
127 #[cfg(not(loom))]
128 let val = unsafe { &*self.shared.mem[last_written].get() }.clone();
129 val
130 }
131
132 #[inline]
134 pub fn write(&mut self, data: T) {
135 #[cfg(loom)]
136 unsafe {
137 self.shared.mem[self.write_idx & INDEX_MASK]
138 .get_mut()
139 .with(|ptr| {
140 let _ = ptr.replace(Some(data));
141 });
142 }
143 #[cfg(not(loom))]
144 let _ = unsafe {
146 self.shared.mem[self.write_idx & INDEX_MASK]
147 .get()
148 .replace(Some(data))
149 };
150
151 self.last_written = Some(self.write_idx & INDEX_MASK);
153 self.write_idx = self
154 .shared
155 .latest_free
156 .swap(self.write_idx | NEW_DATA_FLAG, Ordering::AcqRel);
157 }
158}
159
160#[cfg(test)]
161mod test {
162 use super::*;
163
164 #[test]
165 fn smoke() {
166 let (mut w, mut r) = triple_buffer();
167 w.write(vec![0; 15]);
168
169 assert_eq!(w.try_read(), Some(vec![0; 15]));
170 assert_eq!(r.try_read(), Some(vec![0; 15]));
171 }
172
173 #[test]
174 fn test_read_none() {
175 let (mut w, mut r) = triple_buffer();
176 assert_eq!(r.try_read(), None);
177 w.write(vec![0; 15]);
178 assert_eq!(r.try_read(), Some(vec![0; 15]));
179 }
180}