reft_light/write.rs
1use crate::read::ReadHandle;
2use crate::Apply;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::ptr::NonNull;
7#[cfg(test)]
8use std::sync::atomic::AtomicBool;
9use std::{fmt, thread};
10
11/// A writer handle to a left-right guarded data structure.
12///
13/// All operations on the underlying data should be enqueued as operations of type `O` using
14/// [`append`](Self::append). The effect of this operations are only exposed to readers once
15/// [`publish`](Self::publish) is called.
16///
17/// # Reading through a `WriteHandle`
18///
19/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
20/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
21/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
22/// operations prior to the last call to [`publish`](Self::publish).
23pub struct WriteHandle<O, T, A>
24where
25 O: Apply<T, A>,
26{
27 epochs: crate::Epochs,
28 w_handle: NonNull<T>,
29 oplog: VecDeque<O>,
30 swap_index: usize,
31 r_handle: ReadHandle<T>,
32 last_epochs: Vec<usize>,
33 auxiliary: A,
34 #[cfg(test)]
35 refreshes: usize,
36 #[cfg(test)]
37 is_waiting: Arc<AtomicBool>,
38}
39
40// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
41// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
42// `ReadHandle`, we also need to respect its Send requirements.
43unsafe impl<O, T, A> Send for WriteHandle<O, T, A>
44where
45 O: Apply<T, A>,
46 T: Send,
47 O: Send,
48 A: Send,
49 ReadHandle<T>: Send,
50{
51}
52
53impl<O, T, A> fmt::Debug for WriteHandle<O, T, A>
54where
55 O: Apply<T, A> + fmt::Debug,
56 O: fmt::Debug,
57 A: fmt::Debug,
58{
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("WriteHandle")
61 .field("epochs", &self.epochs)
62 .field("w_handle", &self.w_handle)
63 .field("oplog", &self.oplog)
64 .field("swap_index", &self.swap_index)
65 .field("r_handle", &self.r_handle)
66 .field("auxiliary", &self.auxiliary)
67 .finish()
68 }
69}
70
71impl<O, T, A> Drop for WriteHandle<O, T, A>
72where
73 O: Apply<T, A>,
74{
75 fn drop(&mut self) {
76 use std::ptr;
77 // first, ensure the read handle is up-to-date with all operations
78 if self.swap_index != self.oplog.len() {
79 self.publish();
80 }
81
82 // next, grab the read handle and set it to NULL
83 let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
84
85 // now, wait for all readers to depart
86 let epochs = Arc::clone(&self.epochs);
87 let mut epochs = epochs.lock().unwrap();
88 self.wait(&mut epochs);
89
90 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
91 fence(Ordering::SeqCst);
92
93 // all readers have now observed the NULL, so we own both handles.
94 // all operations have been applied to the r_handle.
95 //
96 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
97 drop(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
98
99 // next we take the r_handle and return it as a boxed value.
100 //
101 // this is safe, since we know that no readers are using this pointer
102 // anymore (due to the .wait() following swapping the pointer with NULL).
103 //
104 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
105 drop(unsafe { Box::from_raw(r_handle) });
106 }
107}
108
109impl<O, T, A> WriteHandle<O, T, A>
110where
111 O: Apply<T, A>,
112{
113 pub(crate) fn new(
114 w_handle: T,
115 epochs: crate::Epochs,
116 r_handle: ReadHandle<T>,
117 auxiliary: A,
118 ) -> Self {
119 Self {
120 epochs,
121 // safety: Box<T> is not null and covariant.
122 w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
123 oplog: VecDeque::new(),
124 swap_index: 0,
125 r_handle,
126 last_epochs: Vec::new(),
127 auxiliary,
128 #[cfg(test)]
129 is_waiting: Arc::new(AtomicBool::new(false)),
130 #[cfg(test)]
131 refreshes: 0,
132 }
133 }
134
135 fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
136 let mut iter = 0;
137 let mut starti = 0;
138
139 #[cfg(test)]
140 {
141 self.is_waiting.store(true, Ordering::Relaxed);
142 }
143 // we're over-estimating here, but slab doesn't expose its max index
144 self.last_epochs.resize(epochs.capacity(), 0);
145 'retry: loop {
146 // read all and see if all have changed (which is likely)
147 for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
148 // if the reader's epoch was even last we read it (which was _after_ the swap),
149 // then they either do not have the pointer, or must have read the pointer strictly
150 // after the swap. in either case, they cannot be using the old pointer value (what
151 // is now w_handle).
152 //
153 // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
154 // which is odd, and std::u{N}::MAX + 1 == 0 is even.
155 //
156 // note also that `ri` _may_ have been re-used since we last read into last_epochs.
157 // this is okay though, as a change still implies that the new reader must have
158 // arrived _after_ we did the atomic swap, and thus must also have seen the new
159 // pointer.
160 if self.last_epochs[ri] % 2 == 0 {
161 continue;
162 }
163
164 let now = epoch.load(Ordering::Acquire);
165 if now != self.last_epochs[ri] {
166 // reader must have seen the last swap, since they have done at least one
167 // operation since we last looked at their epoch, which _must_ mean that they
168 // are no longer using the old pointer value.
169 } else {
170 // reader may not have seen swap
171 // continue from this reader's epoch
172 starti = ii;
173
174 if !cfg!(loom) {
175 // how eagerly should we retry?
176 if iter != 20 {
177 iter += 1;
178 } else {
179 thread::yield_now();
180 }
181 }
182
183 #[cfg(loom)]
184 loom::thread::yield_now();
185
186 continue 'retry;
187 }
188 }
189 break;
190 }
191 #[cfg(test)]
192 {
193 self.is_waiting.store(false, Ordering::Relaxed);
194 }
195 }
196
197 /// Publish all operations append to the log to reads.
198 ///
199 /// This method needs to wait for all readers to move to the "other" copy of the data so that
200 /// it can replay the operational log onto the stale copy the readers used to use. This can
201 /// take some time, especially if readers are executing slow operations, or if there are many
202 /// of them.
203 pub fn publish(&mut self) -> &mut Self {
204 // we need to wait until all epochs have changed since the swaps *or* until a "finished"
205 // flag has been observed to be on for two subsequent iterations (there still may be some
206 // readers present since we did the previous refresh)
207 //
208 // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
209 // only block on pre-existing readers, and they are never waiting to push onto epochs
210 // unless they have finished reading.
211 let epochs = Arc::clone(&self.epochs);
212 let mut epochs = epochs.lock().unwrap();
213
214 self.wait(&mut epochs);
215
216 // all the readers have left!
217 // safety: we haven't freed the Box, and no readers are accessing the w_handle
218 let w_handle = unsafe { self.w_handle.as_mut() };
219
220 // safety: we will not swap while we hold this reference
221 let r_handle = unsafe {
222 self.r_handle
223 .inner
224 .load(Ordering::Acquire)
225 .as_ref()
226 .unwrap()
227 };
228
229 // the w_handle copy has not seen any of the writes in the oplog
230 // the r_handle copy has not seen any of the writes following swap_index
231 if self.swap_index != 0 {
232 // we can drain out the operations that only the w_handle copy needs
233 //
234 // NOTE: the if above is because drain(0..0) would remove 0
235 for op in self.oplog.drain(0..self.swap_index) {
236 O::apply_second(op, r_handle, w_handle, &mut self.auxiliary);
237 }
238 }
239 // we cannot give owned operations to apply_first
240 // since they'll also be needed by the r_handle copy
241 for op in self.oplog.iter_mut() {
242 O::apply_first(op, w_handle, r_handle, &mut self.auxiliary);
243 }
244 // the w_handle copy is about to become the r_handle, and can ignore the oplog
245 self.swap_index = self.oplog.len();
246 // w_handle (the old r_handle) is now fully up to date!
247
248 // at this point, we have exclusive access to w_handle, and it is up-to-date with all
249 // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
250 // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
251 // r_handle.
252 //
253 // it's now time for us to swap the copies so that readers see up-to-date results from
254 // w_handle.
255
256 // swap in our w_handle, and get r_handle in return
257 let r_handle = self
258 .r_handle
259 .inner
260 .swap(self.w_handle.as_ptr(), Ordering::Release);
261
262 // NOTE: at this point, there are likely still readers using r_handle.
263 // safety: r_handle was also created from a Box, so it is not null and is covariant.
264 self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
265
266 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
267 fence(Ordering::SeqCst);
268
269 for (ri, epoch) in epochs.iter() {
270 self.last_epochs[ri] = epoch.load(Ordering::Acquire);
271 }
272
273 #[cfg(test)]
274 {
275 self.refreshes += 1;
276 }
277
278 self
279 }
280
281 /// Publish as necessary to ensure that all operations are visible to readers.
282 ///
283 /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
284 /// This method will only do so if there are pending operations.
285 pub fn flush(&mut self) {
286 if self.has_pending_operations() {
287 self.publish();
288 }
289 }
290
291 /// Returns true if there are operations in the operational log that have not yet been exposed
292 /// to readers.
293 pub fn has_pending_operations(&self) -> bool {
294 // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
295 // there are operations that have not yet been applied to the _write_ handle.
296 self.swap_index < self.oplog.len()
297 }
298
299 /// Append the given operation to the operational log.
300 ///
301 /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
302 pub fn append(&mut self, op: O) -> &mut Self {
303 self.extend(std::iter::once(op));
304 self
305 }
306
307 /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
308 ///
309 /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
310 /// readers still present in this copy. This is not normally something you know; even after
311 /// calling `publish`, readers may still be in the write copy for some time. In general, the
312 /// only time you know this is okay is before the first call to `publish` (since no readers
313 /// ever entered the write copy).
314 // TODO: Make this return `Option<&mut T>`,
315 // and only `Some` if there are indeed to readers in the write copy.
316 pub fn raw_write_handle(&mut self) -> NonNull<T> {
317 self.w_handle
318 }
319
320 /// Returns the backing data structure.
321 ///
322 /// Makes sure that all the pending operations are applied and waits till all the read handles
323 /// have departed. Then it drops one of the copies of the data and
324 /// returns the other copy in a Box.
325 pub fn take(self) -> Box<T> {
326 use std::mem;
327 use std::ptr;
328 // first, ensure the read handle is up-to-date with all operations
329 let mut this = mem::ManuallyDrop::new(self);
330 if this.swap_index != this.oplog.len() {
331 this.publish();
332 }
333
334 // next, grab the read handle and set it to NULL
335 let r_handle = this.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
336
337 // now, wait for all readers to depart
338 // we need to make sure that the lock is relesed before we drop the w_handle
339 // to prevent a deadlock if a reader tries to acquire the lock on drop
340 {
341 let epochs = Arc::clone(&this.epochs);
342 let mut epochs = epochs.lock().unwrap();
343 this.wait(&mut epochs);
344 }
345
346 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
347 fence(Ordering::SeqCst);
348
349 // all readers have now observed the NULL, so we own both handles.
350 // all operations have been applied to the r_handle.
351 //
352 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
353 drop(unsafe { Box::from_raw(this.w_handle.as_ptr()) });
354
355 // next we take the r_handle and return it as a boxed value.
356 //
357 // this is safe, since we know that no readers are using this pointer
358 // anymore (due to the .wait() following swapping the pointer with NULL).
359 //
360 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
361 let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
362
363 // drop the other fields
364 unsafe { ptr::drop_in_place(&mut this.epochs) };
365 unsafe { ptr::drop_in_place(&mut this.oplog) };
366 unsafe { ptr::drop_in_place(&mut this.r_handle) };
367 unsafe { ptr::drop_in_place(&mut this.last_epochs) };
368 #[cfg(test)]
369 unsafe {
370 ptr::drop_in_place(&mut this.is_waiting)
371 };
372
373 // return the boxed r_handle
374 boxed_r_handle
375 }
376}
377
378// allow using write handle for reads
379use std::ops::Deref;
380impl<O, T, A> Deref for WriteHandle<O, T, A>
381where
382 O: Apply<T, A>,
383{
384 type Target = ReadHandle<T>;
385 fn deref(&self) -> &Self::Target {
386 &self.r_handle
387 }
388}
389
390impl<O, T, A> Extend<O> for WriteHandle<O, T, A>
391where
392 O: Apply<T, A>,
393{
394 /// Add multiple operations to the operational log.
395 ///
396 /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
397 fn extend<I>(&mut self, ops: I)
398 where
399 I: IntoIterator<Item = O>,
400 {
401 self.oplog.extend(ops);
402 }
403}
404
405/// `WriteHandle` can be sent across thread boundaries:
406///
407/// ```
408/// use reft_light::WriteHandle;
409///
410/// struct Data;
411/// impl reft_light::Apply<Data, ()> for () {
412/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
413/// }
414///
415/// fn is_send<T: Send>() {
416/// // dummy function just used for its parameterized type bound
417/// }
418///
419/// is_send::<WriteHandle<(), Data, ()>>()
420/// ```
421///
422/// As long as the inner types allow that of course.
423/// Namely, the data type has to be `Send`:
424///
425/// ```compile_fail
426/// use reft_light::WriteHandle;
427/// use std::rc::Rc;
428///
429/// struct Data(Rc<()>);
430/// impl reft_light::Apply<Data, ()> for () {
431/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
432/// }
433///
434/// fn is_send<T: Send>() {
435/// // dummy function just used for its parameterized type bound
436/// }
437///
438/// is_send::<WriteHandle<(), Data, ()>>()
439/// ```
440///
441/// .. the operation type has to be `Send`:
442///
443/// ```compile_fail
444/// use reft_light::WriteHandle;
445/// use std::rc::Rc;
446///
447/// struct Data;
448/// impl reft_light::Apply<Data, ()> for Rc<()> {
449/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
450/// }
451///
452/// fn is_send<T: Send>() {
453/// // dummy function just used for its parameterized type bound
454/// }
455///
456/// is_send::<WriteHandle<Rc<()>, Data, ()>>()
457/// ```
458///
459/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
460///
461/// ```compile_fail
462/// use reft_light::WriteHandle;
463/// use std::cell::Cell;
464///
465/// struct Data(Cell<()>);
466/// impl reft_light::Apply<Data, ()> for () {
467/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
468/// }
469///
470/// fn is_send<T: Send>() {
471/// // dummy function just used for its parameterized type bound
472/// }
473///
474/// is_send::<WriteHandle<(), Data, ()>>()
475/// ```
476#[allow(dead_code)]
477struct CheckWriteHandleSend;
478
479#[cfg(test)]
480mod tests {
481 use crate::sync::{AtomicUsize, Mutex, Ordering};
482 use crate::Apply;
483 use slab::Slab;
484 include!("./utilities.rs");
485
486 #[test]
487 fn append_test() {
488 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
489 w.append(CounterAddOp(1));
490 assert_eq!(w.oplog.len(), 1);
491 w.publish();
492 w.append(CounterAddOp(2));
493 w.append(CounterAddOp(3));
494 assert_eq!(w.oplog.len(), 3);
495 }
496
497 #[test]
498 fn take_test() {
499 // publish twice then take with no pending operations
500 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
501 w.append(CounterAddOp(1));
502 w.publish();
503 w.append(CounterAddOp(1));
504 w.publish();
505 assert_eq!(*w.take(), 4);
506
507 // publish twice then pending operation published by take
508 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
509 w.append(CounterAddOp(1));
510 w.publish();
511 w.append(CounterAddOp(1));
512 w.publish();
513 w.append(CounterAddOp(2));
514 assert_eq!(*w.take(), 6);
515
516 // normal publish then pending operations published by take
517 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
518 w.append(CounterAddOp(1));
519 w.publish();
520 w.append(CounterAddOp(1));
521 assert_eq!(*w.take(), 4);
522
523 // pending operations published by take
524 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
525 w.append(CounterAddOp(1));
526 assert_eq!(*w.take(), 3);
527
528 // emptry op queue
529 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
530 w.append(CounterAddOp(1));
531 w.publish();
532 assert_eq!(*w.take(), 3);
533
534 // no operations
535 let w = crate::new::<CounterAddOp, _, _>(2, ());
536 assert_eq!(*w.take(), 2);
537 }
538
539 #[test]
540 fn wait_test() {
541 use std::sync::{Arc, Barrier};
542 use std::thread;
543 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
544
545 // Case 1: If epoch is set to default.
546 let test_epochs: crate::Epochs = Default::default();
547 let mut test_epochs = test_epochs.lock().unwrap();
548 // since there is no epoch to waiting for, wait function will return immediately.
549 w.wait(&mut test_epochs);
550
551 // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
552 // and wait has been called.
553 let held_epoch = Arc::new(AtomicUsize::new(1));
554
555 w.last_epochs = vec![2, 2, 1];
556 let mut epochs_slab = Slab::new();
557 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
558 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
559 epochs_slab.insert(Arc::clone(&held_epoch));
560
561 let barrier = Arc::new(Barrier::new(2));
562
563 let is_waiting = Arc::clone(&w.is_waiting);
564
565 // check writers waiting state before calling wait.
566 let is_waiting_v = is_waiting.load(Ordering::Relaxed);
567 assert_eq!(false, is_waiting_v);
568
569 let barrier2 = Arc::clone(&barrier);
570 let test_epochs = Arc::new(Mutex::new(epochs_slab));
571 let wait_handle = thread::spawn(move || {
572 barrier2.wait();
573 let mut test_epochs = test_epochs.lock().unwrap();
574 w.wait(&mut test_epochs);
575 });
576
577 barrier.wait();
578
579 // make sure that writer wait() will call first, only then allow to updates the held epoch.
580 while !is_waiting.load(Ordering::Relaxed) {
581 thread::yield_now();
582 }
583
584 held_epoch.fetch_add(1, Ordering::SeqCst);
585
586 // join to make sure that wait must return after the progress/increment
587 // of held_epoch.
588 let _ = wait_handle.join();
589 }
590
591 #[test]
592 fn flush_noblock() {
593 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
594 let r = w.clone();
595 w.append(CounterAddOp(42));
596 w.publish();
597 assert_eq!(*r.enter().unwrap(), 42);
598
599 // pin the epoch
600 let _count = r.enter();
601 // refresh would hang here
602 assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
603 assert!(!w.has_pending_operations());
604 }
605
606 #[test]
607 fn flush_no_refresh() {
608 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
609
610 // Until we refresh, writes are written directly instead of going to the
611 // oplog (because there can't be any readers on the w_handle table).
612 assert!(!w.has_pending_operations());
613 w.publish();
614 assert!(!w.has_pending_operations());
615 assert_eq!(w.refreshes, 1);
616
617 w.append(CounterAddOp(42));
618 assert!(w.has_pending_operations());
619 w.publish();
620 assert!(!w.has_pending_operations());
621 assert_eq!(w.refreshes, 2);
622
623 w.append(CounterAddOp(42));
624 assert!(w.has_pending_operations());
625 w.publish();
626 assert!(!w.has_pending_operations());
627 assert_eq!(w.refreshes, 3);
628
629 // Sanity check that a refresh would have been visible
630 assert!(!w.has_pending_operations());
631 w.publish();
632 assert_eq!(w.refreshes, 4);
633 }
634}