left_right/read.rs
1use crate::sync::{fence, Arc, AtomicPtr, AtomicUsize, Ordering};
2use crossbeam_utils::CachePadded;
3use std::cell::Cell;
4use std::fmt;
5use std::marker::PhantomData;
6use std::ptr::NonNull;
7
8// To make [`WriteHandle`] and friends work.
9#[cfg(doc)]
10use crate::WriteHandle;
11
12mod guard;
13pub use guard::ReadGuard;
14
15mod factory;
16pub use factory::ReadHandleFactory;
17
18/// A read handle to a left-right guarded data structure.
19///
20/// To use a handle, first call [`enter`](Self::enter) to acquire a [`ReadGuard`]. This is similar
21/// to acquiring a `Mutex`, except that no exclusive lock is taken. All reads of the underlying
22/// data structure can then happen through the [`ReadGuard`] (which implements `Deref<Target =
23/// T>`).
24///
25/// Reads through a `ReadHandle` only see the changes up until the last time
26/// [`WriteHandle::publish`] was called. That is, even if a writer performs a number of
27/// modifications to the underlying data, those changes are not visible to reads until the writer
28/// calls [`publish`](crate::WriteHandle::publish).
29///
30/// `ReadHandle` is not `Sync`, which means that you cannot share a `ReadHandle` across many
31/// threads. This is because the coordination necessary to do so would significantly hamper the
32/// scalability of reads. If you had many reads go through one `ReadHandle`, they would need to
33/// coordinate among themselves for every read, which would lead to core contention and poor
34/// multi-core performance. By having `ReadHandle` not be `Sync`, you are forced to keep a
35/// `ReadHandle` per reader, which guarantees that you do not accidentally ruin your performance.
36///
37/// You can create a new, independent `ReadHandle` either by cloning an existing handle or by using
38/// a [`ReadHandleFactory`]. Note, however, that creating a new handle through either of these
39/// mechanisms _does_ take a lock, and may therefore become a bottleneck if you do it frequently.
40pub struct ReadHandle<T> {
41 pub(crate) inner: Arc<AtomicPtr<T>>,
42 pub(crate) epochs: crate::Epochs,
43 epoch: Arc<CachePadded<AtomicUsize>>,
44 epoch_i: usize,
45 enters: Cell<usize>,
46
47 // `ReadHandle` is _only_ Send if T is Sync. If T is !Sync, then it's not okay for us to expose
48 // references to it to other threads! Since negative impls are not available on stable, we pull
49 // this little hack to make the type not auto-impl Send, and then explicitly add the impl when
50 // appropriate.
51 _unimpl_send: PhantomData<*const T>,
52}
53unsafe impl<T> Send for ReadHandle<T> where T: Sync {}
54
55impl<T> Drop for ReadHandle<T> {
56 fn drop(&mut self) {
57 // epoch must already be even for us to have &mut self,
58 // so okay to lock since we're not holding up the epoch anyway.
59 let e = self.epochs.lock().unwrap().remove(self.epoch_i);
60 assert!(Arc::ptr_eq(&e, &self.epoch));
61 assert_eq!(self.enters.get(), 0);
62 }
63}
64
65impl<T> fmt::Debug for ReadHandle<T> {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_struct("ReadHandle")
68 .field("epochs", &self.epochs)
69 .field("epoch", &self.epoch)
70 .finish()
71 }
72}
73
74impl<T> Clone for ReadHandle<T> {
75 fn clone(&self) -> Self {
76 ReadHandle::new_with_arc(Arc::clone(&self.inner), Arc::clone(&self.epochs))
77 }
78}
79
80impl<T> ReadHandle<T> {
81 pub(crate) fn new(inner: T, epochs: crate::Epochs) -> Self {
82 let store = Box::into_raw(Box::new(inner));
83 let inner = Arc::new(AtomicPtr::new(store));
84 Self::new_with_arc(inner, epochs)
85 }
86
87 fn new_with_arc(inner: Arc<AtomicPtr<T>>, epochs: crate::Epochs) -> Self {
88 // tell writer about our epoch tracker
89 let epoch = Arc::new(CachePadded::new(AtomicUsize::new(0)));
90 // okay to lock, since we're not holding up the epoch
91 let epoch_i = epochs.lock().unwrap().insert(Arc::clone(&epoch));
92
93 Self {
94 epochs,
95 epoch,
96 epoch_i,
97 enters: Cell::new(0),
98 inner,
99 _unimpl_send: PhantomData,
100 }
101 }
102
103 /// Create a [`ReadHandleFactory`] which is `Send` & `Sync` and can be shared across threads to create
104 /// additional [`ReadHandle`] instances.
105 pub fn factory(&self) -> ReadHandleFactory<T> {
106 ReadHandleFactory {
107 inner: Arc::clone(&self.inner),
108 epochs: Arc::clone(&self.epochs),
109 }
110 }
111}
112
113impl<T> ReadHandle<T> {
114 /// Take out a guarded live reference to the read copy of the `T`.
115 ///
116 /// While the guard lives, the [`WriteHandle`] cannot proceed with a call to
117 /// [`WriteHandle::publish`], so no queued operations will become visible to _any_ reader.
118 ///
119 /// If the `WriteHandle` has been dropped, this function returns `None`.
120 pub fn enter(&self) -> Option<ReadGuard<'_, T>> {
121 let enters = self.enters.get();
122 if enters != 0 {
123 // We have already locked the epoch.
124 // Just give out another guard.
125 let r_handle = self.inner.load(Ordering::Acquire);
126 // since we previously bumped our epoch, this pointer will remain valid until we bump
127 // it again, which only happens when the last ReadGuard is dropped.
128 let r_handle = unsafe { r_handle.as_ref() };
129
130 return if let Some(r_handle) = r_handle {
131 self.enters.set(enters + 1);
132 Some(ReadGuard {
133 handle: guard::ReadHandleState::from(self),
134 t: r_handle,
135 })
136 } else {
137 unreachable!("if pointer is null, no ReadGuard should have been issued");
138 };
139 }
140
141 // once we update our epoch, the writer can no longer do a swap until we set the MSB to
142 // indicate that we've finished our read. however, we still need to deal with the case of a
143 // race between when the writer reads our epoch and when they decide to make the swap.
144 //
145 // assume that there is a concurrent writer. it just swapped the atomic pointer from A to
146 // B. the writer wants to modify A, and needs to know if that is safe. we can be in any of
147 // the following cases when we atomically swap out our epoch:
148 //
149 // 1. the writer has read our previous epoch twice
150 // 2. the writer has already read our previous epoch once
151 // 3. the writer has not yet read our previous epoch
152 //
153 // let's discuss each of these in turn.
154 //
155 // 1. since writers assume they are free to proceed if they read an epoch with MSB set
156 // twice in a row, this is equivalent to case (2) below.
157 // 2. the writer will see our epoch change, and so will assume that we have read B. it
158 // will therefore feel free to modify A. note that *another* pointer swap can happen,
159 // back to A, but then the writer would be block on our epoch, and so cannot modify
160 // A *or* B. consequently, using a pointer we read *after* the epoch swap is definitely
161 // safe here.
162 // 3. the writer will read our epoch, notice that MSB is not set, and will keep reading,
163 // continuing to observe that it is still not set until we finish our read. thus,
164 // neither A nor B are being modified, and we can safely use either.
165 //
166 // in all cases, using a pointer we read *after* updating our epoch is safe.
167
168 // so, update our epoch tracker.
169 self.epoch.fetch_add(1, Ordering::AcqRel);
170
171 // ensure that the pointer read happens strictly after updating the epoch
172 fence(Ordering::SeqCst);
173
174 // then, atomically read pointer, and use the copy being pointed to
175 let r_handle = self.inner.load(Ordering::Acquire);
176
177 // since we bumped our epoch, this pointer will remain valid until we bump it again
178 let r_handle = unsafe { r_handle.as_ref() };
179
180 if let Some(r_handle) = r_handle {
181 // add a guard to ensure we restore read parity even if we panic
182 let enters = self.enters.get() + 1;
183 self.enters.set(enters);
184 Some(ReadGuard {
185 handle: guard::ReadHandleState::from(self),
186 t: r_handle,
187 })
188 } else {
189 // the writehandle has been dropped, and so has both copies,
190 // so restore parity and return None
191 self.epoch.fetch_add(1, Ordering::AcqRel);
192 None
193 }
194 }
195
196 /// Returns true if the [`WriteHandle`] has been dropped.
197 pub fn was_dropped(&self) -> bool {
198 self.inner.load(Ordering::Acquire).is_null()
199 }
200
201 /// Returns a raw pointer to the read copy of the data.
202 ///
203 /// Note that it is only safe to read through this pointer if you _know_ that the writer will
204 /// not start writing into it. This is most likely only the case if you are calling this method
205 /// from inside a method that holds `&mut WriteHandle`.
206 ///
207 /// Casting this pointer to `&mut` is never safe.
208 pub fn raw_handle(&self) -> Option<NonNull<T>> {
209 NonNull::new(self.inner.load(Ordering::Acquire))
210 }
211}
212
213/// `ReadHandle` cannot be shared across threads:
214///
215/// ```compile_fail
216/// use left_right::ReadHandle;
217///
218/// fn is_sync<T: Sync>() {
219/// // dummy function just used for its parameterized type bound
220/// }
221///
222/// // the line below will not compile as ReadHandle does not implement Sync
223///
224/// is_sync::<ReadHandle<u64>>()
225/// ```
226///
227/// But, it can be sent across threads:
228///
229/// ```
230/// use left_right::ReadHandle;
231///
232/// fn is_send<T: Send>() {
233/// // dummy function just used for its parameterized type bound
234/// }
235///
236/// is_send::<ReadHandle<u64>>()
237/// ```
238///
239/// As long as the wrapped type is `Sync` that is.
240///
241/// ```compile_fail
242/// use left_right::ReadHandle;
243///
244/// fn is_send<T: Send>() {}
245///
246/// is_send::<ReadHandle<std::cell::Cell<u64>>>()
247/// ```
248#[allow(dead_code)]
249struct CheckReadHandleSendNotSync;