reft_light/write.rs
1use crate::read::ReadHandle;
2use crate::Apply;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::ptr::NonNull;
7#[cfg(test)]
8use std::sync::atomic::AtomicBool;
9use std::{fmt, thread};
10
11/// A writer handle to a left-right guarded data structure.
12///
13/// All operations on the underlying data should be enqueued as operations of type `O` using
14/// [`append`](Self::append). The effect of this operations are only exposed to readers once
15/// [`publish`](Self::publish) is called.
16///
17/// # Reading through a `WriteHandle`
18///
19/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
20/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
21/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
22/// operations prior to the last call to [`publish`](Self::publish).
23pub struct WriteHandle<O, T, A>
24where
25 O: Apply<T, A>,
26{
27 epochs: crate::Epochs,
28 w_handle: NonNull<T>,
29 oplog: VecDeque<O>,
30 swap_index: usize,
31 r_handle: ReadHandle<T>,
32 last_epochs: Vec<usize>,
33 auxiliary: A,
34 #[cfg(test)]
35 refreshes: usize,
36 #[cfg(test)]
37 is_waiting: Arc<AtomicBool>,
38}
39
40// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
41// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
42// `ReadHandle`, we also need to respect its Send requirements.
43unsafe impl<O, T, A> Send for WriteHandle<O, T, A>
44where
45 O: Apply<T, A>,
46 T: Send,
47 O: Send,
48 A: Send,
49 ReadHandle<T>: Send,
50{
51}
52
53impl<O, T, A> fmt::Debug for WriteHandle<O, T, A>
54where
55 O: Apply<T, A> + fmt::Debug,
56 O: fmt::Debug,
57 A: fmt::Debug,
58{
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.debug_struct("WriteHandle")
61 .field("epochs", &self.epochs)
62 .field("w_handle", &self.w_handle)
63 .field("oplog", &self.oplog)
64 .field("swap_index", &self.swap_index)
65 .field("r_handle", &self.r_handle)
66 .field("auxiliary", &self.auxiliary)
67 .finish()
68 }
69}
70
71impl<O, T, A> Drop for WriteHandle<O, T, A>
72where
73 O: Apply<T, A>,
74{
75 fn drop(&mut self) {
76 use std::ptr;
77 // first, ensure the read handle is up-to-date with all operations
78 if self.swap_index != self.oplog.len() {
79 self.publish();
80 }
81
82 // next, grab the read handle and set it to NULL
83 let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
84
85 // now, wait for all readers to depart
86 let epochs = Arc::clone(&self.epochs);
87 let mut epochs = epochs.lock().unwrap();
88 self.wait(&mut epochs);
89
90 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
91 fence(Ordering::SeqCst);
92
93 // all readers have now observed the NULL, so we own both handles.
94 // all operations have been applied to the r_handle.
95 //
96 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
97 drop(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
98
99 // next we take the r_handle and return it as a boxed value.
100 //
101 // this is safe, since we know that no readers are using this pointer
102 // anymore (due to the .wait() following swapping the pointer with NULL).
103 //
104 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
105 drop(unsafe { Box::from_raw(r_handle) });
106 }
107}
108
109impl<O, T, A> WriteHandle<O, T, A>
110where
111 O: Apply<T, A>,
112{
113 pub(crate) fn new(
114 w_handle: T,
115 epochs: crate::Epochs,
116 r_handle: ReadHandle<T>,
117 auxiliary: A,
118 ) -> Self {
119 Self {
120 epochs,
121 // safety: Box<T> is not null and covariant.
122 w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
123 oplog: VecDeque::new(),
124 swap_index: 0,
125 r_handle,
126 last_epochs: Vec::new(),
127 auxiliary,
128 #[cfg(test)]
129 is_waiting: Arc::new(AtomicBool::new(false)),
130 #[cfg(test)]
131 refreshes: 0,
132 }
133 }
134
135 fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
136 let mut iter = 0;
137 let mut starti = 0;
138
139 #[cfg(test)]
140 {
141 self.is_waiting.store(true, Ordering::Relaxed);
142 }
143 // we're over-estimating here, but slab doesn't expose its max index
144 self.last_epochs.resize(epochs.capacity(), 0);
145 'retry: loop {
146 // read all and see if all have changed (which is likely)
147 for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
148 // if the reader's epoch was even last we read it (which was _after_ the swap),
149 // then they either do not have the pointer, or must have read the pointer strictly
150 // after the swap. in either case, they cannot be using the old pointer value (what
151 // is now w_handle).
152 //
153 // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
154 // which is odd, and std::u{N}::MAX + 1 == 0 is even.
155 //
156 // note also that `ri` _may_ have been re-used since we last read into last_epochs.
157 // this is okay though, as a change still implies that the new reader must have
158 // arrived _after_ we did the atomic swap, and thus must also have seen the new
159 // pointer.
160 if self.last_epochs[ri] % 2 == 0 {
161 continue;
162 }
163
164 let now = epoch.load(Ordering::Acquire);
165 if now != self.last_epochs[ri] {
166 // reader must have seen the last swap, since they have done at least one
167 // operation since we last looked at their epoch, which _must_ mean that they
168 // are no longer using the old pointer value.
169 } else {
170 // reader may not have seen swap
171 // continue from this reader's epoch
172 starti = ii;
173
174 if !cfg!(loom) {
175 // how eagerly should we retry?
176 if iter != 20 {
177 iter += 1;
178 } else {
179 thread::yield_now();
180 }
181 }
182
183 #[cfg(loom)]
184 loom::thread::yield_now();
185
186 continue 'retry;
187 }
188 }
189 break;
190 }
191 #[cfg(test)]
192 {
193 self.is_waiting.store(false, Ordering::Relaxed);
194 }
195 }
196
197 /// Publish all operations append to the log to reads.
198 ///
199 /// This method needs to wait for all readers to move to the "other" copy of the data so that
200 /// it can replay the operational log onto the stale copy the readers used to use. This can
201 /// take some time, especially if readers are executing slow operations, or if there are many
202 /// of them.
203 pub fn publish(&mut self) -> &mut Self {
204 // we need to wait until all epochs have changed since the swaps *or* until a "finished"
205 // flag has been observed to be on for two subsequent iterations (there still may be some
206 // readers present since we did the previous refresh)
207 //
208 // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
209 // only block on pre-existing readers, and they are never waiting to push onto epochs
210 // unless they have finished reading.
211 let epochs = Arc::clone(&self.epochs);
212 let mut epochs = epochs.lock().unwrap();
213
214 self.wait(&mut epochs);
215
216 // all the readers have left!
217 // safety: we haven't freed the Box, and no readers are accessing the w_handle
218 let w_handle = unsafe { self.w_handle.as_mut() };
219
220 // safety: we will not swap while we hold this reference
221 let r_handle = unsafe {
222 self.r_handle
223 .inner
224 .load(Ordering::Acquire)
225 .as_ref()
226 .unwrap()
227 };
228
229 // the w_handle copy has not seen any of the writes in the oplog
230 // the r_handle copy has not seen any of the writes following swap_index
231 if self.swap_index != 0 {
232 // we can drain out the operations that only the w_handle copy needs
233 //
234 // NOTE: the if above is because drain(0..0) would remove 0
235 for op in self.oplog.drain(0..self.swap_index) {
236 O::apply_second(op, r_handle, w_handle, &mut self.auxiliary);
237 }
238 }
239 // we cannot give owned operations to apply_first
240 // since they'll also be needed by the r_handle copy
241 for op in self.oplog.iter_mut() {
242 O::apply_first(op, w_handle, r_handle, &mut self.auxiliary);
243 }
244 // the w_handle copy is about to become the r_handle, and can ignore the oplog
245 self.swap_index = self.oplog.len();
246 // w_handle (the old r_handle) is now fully up to date!
247
248 // at this point, we have exclusive access to w_handle, and it is up-to-date with all
249 // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
250 // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
251 // r_handle.
252 //
253 // it's now time for us to swap the copies so that readers see up-to-date results from
254 // w_handle.
255
256 // swap in our w_handle, and get r_handle in return
257 let r_handle = self
258 .r_handle
259 .inner
260 .swap(self.w_handle.as_ptr(), Ordering::Release);
261
262 // NOTE: at this point, there are likely still readers using r_handle.
263 // safety: r_handle was also created from a Box, so it is not null and is covariant.
264 self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
265
266 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
267 fence(Ordering::SeqCst);
268
269 for (ri, epoch) in epochs.iter() {
270 self.last_epochs[ri] = epoch.load(Ordering::Acquire);
271 }
272
273 #[cfg(test)]
274 {
275 self.refreshes += 1;
276 }
277
278 self
279 }
280
281 /// Publish as necessary to ensure that all operations are visible to readers.
282 ///
283 /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
284 /// This method will only do so if there are pending operations.
285 pub fn flush(&mut self) {
286 if self.has_pending_operations() {
287 self.publish();
288 }
289 }
290
291 /// Returns true if there are operations in the operational log that have not yet been exposed
292 /// to readers.
293 pub fn has_pending_operations(&self) -> bool {
294 // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
295 // there are operations that have not yet been applied to the _write_ handle.
296 self.swap_index < self.oplog.len()
297 }
298
299 /// Append the given operation to the operational log.
300 ///
301 /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
302 pub fn append(&mut self, op: O) -> &mut Self {
303 self.extend(std::iter::once(op));
304 self
305 }
306
307 /// Returns a reference to the auxiliary data.
308 pub fn auxiliary(&self) -> &A {
309 &self.auxiliary
310 }
311
312 /// Returns a mutable reference to the auxiliary data structure.
313 pub fn auxiliary_mut(&mut self) -> &mut A {
314 &mut self.auxiliary
315 }
316
317 /// Returns the backing data structure.
318 ///
319 /// Makes sure that all the pending operations are applied and waits till all the read handles
320 /// have departed. Then it drops one of the copies of the data and
321 /// returns the other copy in a Box.
322 pub fn take(self) -> Box<T> {
323 use std::mem;
324 use std::ptr;
325 // first, ensure the read handle is up-to-date with all operations
326 let mut this = mem::ManuallyDrop::new(self);
327 if this.swap_index != this.oplog.len() {
328 this.publish();
329 }
330
331 // next, grab the read handle and set it to NULL
332 let r_handle = this.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
333
334 // now, wait for all readers to depart
335 // we need to make sure that the lock is relesed before we drop the w_handle
336 // to prevent a deadlock if a reader tries to acquire the lock on drop
337 {
338 let epochs = Arc::clone(&this.epochs);
339 let mut epochs = epochs.lock().unwrap();
340 this.wait(&mut epochs);
341 }
342
343 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
344 fence(Ordering::SeqCst);
345
346 // all readers have now observed the NULL, so we own both handles.
347 // all operations have been applied to the r_handle.
348 //
349 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
350 drop(unsafe { Box::from_raw(this.w_handle.as_ptr()) });
351
352 // next we take the r_handle and return it as a boxed value.
353 //
354 // this is safe, since we know that no readers are using this pointer
355 // anymore (due to the .wait() following swapping the pointer with NULL).
356 //
357 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
358 let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
359
360 // drop the other fields
361 unsafe { ptr::drop_in_place(&mut this.epochs) };
362 unsafe { ptr::drop_in_place(&mut this.oplog) };
363 unsafe { ptr::drop_in_place(&mut this.r_handle) };
364 unsafe { ptr::drop_in_place(&mut this.last_epochs) };
365 #[cfg(test)]
366 unsafe {
367 ptr::drop_in_place(&mut this.is_waiting)
368 };
369
370 // return the boxed r_handle
371 boxed_r_handle
372 }
373}
374
375// allow using write handle for reads
376use std::ops::Deref;
377impl<O, T, A> Deref for WriteHandle<O, T, A>
378where
379 O: Apply<T, A>,
380{
381 type Target = ReadHandle<T>;
382 fn deref(&self) -> &Self::Target {
383 &self.r_handle
384 }
385}
386
387impl<O, T, A> Extend<O> for WriteHandle<O, T, A>
388where
389 O: Apply<T, A>,
390{
391 /// Add multiple operations to the operational log.
392 ///
393 /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
394 fn extend<I>(&mut self, ops: I)
395 where
396 I: IntoIterator<Item = O>,
397 {
398 self.oplog.extend(ops);
399 }
400}
401
402/// `WriteHandle` can be sent across thread boundaries:
403///
404/// ```
405/// use reft_light::WriteHandle;
406///
407/// struct Data;
408/// impl reft_light::Apply<Data, ()> for () {
409/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
410/// }
411///
412/// fn is_send<T: Send>() {
413/// // dummy function just used for its parameterized type bound
414/// }
415///
416/// is_send::<WriteHandle<(), Data, ()>>()
417/// ```
418///
419/// As long as the inner types allow that of course.
420/// Namely, the data type has to be `Send`:
421///
422/// ```compile_fail
423/// use reft_light::WriteHandle;
424/// use std::rc::Rc;
425///
426/// struct Data(Rc<()>);
427/// impl reft_light::Apply<Data, ()> for () {
428/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
429/// }
430///
431/// fn is_send<T: Send>() {
432/// // dummy function just used for its parameterized type bound
433/// }
434///
435/// is_send::<WriteHandle<(), Data, ()>>()
436/// ```
437///
438/// .. the operation type has to be `Send`:
439///
440/// ```compile_fail
441/// use reft_light::WriteHandle;
442/// use std::rc::Rc;
443///
444/// struct Data;
445/// impl reft_light::Apply<Data, ()> for Rc<()> {
446/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
447/// }
448///
449/// fn is_send<T: Send>() {
450/// // dummy function just used for its parameterized type bound
451/// }
452///
453/// is_send::<WriteHandle<Rc<()>, Data, ()>>()
454/// ```
455///
456/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
457///
458/// ```compile_fail
459/// use reft_light::WriteHandle;
460/// use std::cell::Cell;
461///
462/// struct Data(Cell<()>);
463/// impl reft_light::Apply<Data, ()> for () {
464/// fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
465/// }
466///
467/// fn is_send<T: Send>() {
468/// // dummy function just used for its parameterized type bound
469/// }
470///
471/// is_send::<WriteHandle<(), Data, ()>>()
472/// ```
473#[allow(dead_code)]
474struct CheckWriteHandleSend;
475
476#[cfg(test)]
477mod tests {
478 use crate::sync::{AtomicUsize, Mutex, Ordering};
479 use crate::Apply;
480 use slab::Slab;
481 include!("./utilities.rs");
482
483 #[test]
484 fn append_test() {
485 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
486 w.append(CounterAddOp(1));
487 assert_eq!(w.oplog.len(), 1);
488 w.publish();
489 w.append(CounterAddOp(2));
490 w.append(CounterAddOp(3));
491 assert_eq!(w.oplog.len(), 3);
492 }
493
494 #[test]
495 fn take_test() {
496 // publish twice then take with no pending operations
497 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
498 w.append(CounterAddOp(1));
499 w.publish();
500 w.append(CounterAddOp(1));
501 w.publish();
502 assert_eq!(*w.take(), 4);
503
504 // publish twice then pending operation published by take
505 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
506 w.append(CounterAddOp(1));
507 w.publish();
508 w.append(CounterAddOp(1));
509 w.publish();
510 w.append(CounterAddOp(2));
511 assert_eq!(*w.take(), 6);
512
513 // normal publish then pending operations published by take
514 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
515 w.append(CounterAddOp(1));
516 w.publish();
517 w.append(CounterAddOp(1));
518 assert_eq!(*w.take(), 4);
519
520 // pending operations published by take
521 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
522 w.append(CounterAddOp(1));
523 assert_eq!(*w.take(), 3);
524
525 // emptry op queue
526 let mut w = crate::new::<CounterAddOp, _, _>(2, ());
527 w.append(CounterAddOp(1));
528 w.publish();
529 assert_eq!(*w.take(), 3);
530
531 // no operations
532 let w = crate::new::<CounterAddOp, _, _>(2, ());
533 assert_eq!(*w.take(), 2);
534 }
535
536 #[test]
537 fn wait_test() {
538 use std::sync::{Arc, Barrier};
539 use std::thread;
540 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
541
542 // Case 1: If epoch is set to default.
543 let test_epochs: crate::Epochs = Default::default();
544 let mut test_epochs = test_epochs.lock().unwrap();
545 // since there is no epoch to waiting for, wait function will return immediately.
546 w.wait(&mut test_epochs);
547
548 // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
549 // and wait has been called.
550 let held_epoch = Arc::new(AtomicUsize::new(1));
551
552 w.last_epochs = vec![2, 2, 1];
553 let mut epochs_slab = Slab::new();
554 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
555 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
556 epochs_slab.insert(Arc::clone(&held_epoch));
557
558 let barrier = Arc::new(Barrier::new(2));
559
560 let is_waiting = Arc::clone(&w.is_waiting);
561
562 // check writers waiting state before calling wait.
563 let is_waiting_v = is_waiting.load(Ordering::Relaxed);
564 assert_eq!(false, is_waiting_v);
565
566 let barrier2 = Arc::clone(&barrier);
567 let test_epochs = Arc::new(Mutex::new(epochs_slab));
568 let wait_handle = thread::spawn(move || {
569 barrier2.wait();
570 let mut test_epochs = test_epochs.lock().unwrap();
571 w.wait(&mut test_epochs);
572 });
573
574 barrier.wait();
575
576 // make sure that writer wait() will call first, only then allow to updates the held epoch.
577 while !is_waiting.load(Ordering::Relaxed) {
578 thread::yield_now();
579 }
580
581 held_epoch.fetch_add(1, Ordering::SeqCst);
582
583 // join to make sure that wait must return after the progress/increment
584 // of held_epoch.
585 let _ = wait_handle.join();
586 }
587
588 #[test]
589 fn flush_noblock() {
590 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
591 let r = w.clone();
592 w.append(CounterAddOp(42));
593 w.publish();
594 assert_eq!(*r.enter().unwrap(), 42);
595
596 // pin the epoch
597 let _count = r.enter();
598 // refresh would hang here
599 assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
600 assert!(!w.has_pending_operations());
601 }
602
603 #[test]
604 fn flush_no_refresh() {
605 let mut w = crate::new::<CounterAddOp, _, _>(0, ());
606
607 // Until we refresh, writes are written directly instead of going to the
608 // oplog (because there can't be any readers on the w_handle table).
609 assert!(!w.has_pending_operations());
610 w.publish();
611 assert!(!w.has_pending_operations());
612 assert_eq!(w.refreshes, 1);
613
614 w.append(CounterAddOp(42));
615 assert!(w.has_pending_operations());
616 w.publish();
617 assert!(!w.has_pending_operations());
618 assert_eq!(w.refreshes, 2);
619
620 w.append(CounterAddOp(42));
621 assert!(w.has_pending_operations());
622 w.publish();
623 assert!(!w.has_pending_operations());
624 assert_eq!(w.refreshes, 3);
625
626 // Sanity check that a refresh would have been visible
627 assert!(!w.has_pending_operations());
628 w.publish();
629 assert_eq!(w.refreshes, 4);
630 }
631}