reft_light/
write.rs

1use crate::read::ReadHandle;
2use crate::Apply;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::ptr::NonNull;
7#[cfg(test)]
8use std::sync::atomic::AtomicBool;
9use std::{fmt, thread};
10
11/// A writer handle to a left-right guarded data structure.
12///
13/// All operations on the underlying data should be enqueued as operations of type `O` using
14/// [`append`](Self::append). The effect of this operations are only exposed to readers once
15/// [`publish`](Self::publish) is called.
16///
17/// # Reading through a `WriteHandle`
18///
19/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
20/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
21/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
22/// operations prior to the last call to [`publish`](Self::publish).
23pub struct WriteHandle<O, T, A>
24where
25    O: Apply<T, A>,
26{
27    epochs: crate::Epochs,
28    w_handle: NonNull<T>,
29    oplog: VecDeque<O>,
30    swap_index: usize,
31    r_handle: ReadHandle<T>,
32    last_epochs: Vec<usize>,
33    auxiliary: A,
34    #[cfg(test)]
35    refreshes: usize,
36    #[cfg(test)]
37    is_waiting: Arc<AtomicBool>,
38}
39
40// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
41// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
42// `ReadHandle`, we also need to respect its Send requirements.
43unsafe impl<O, T, A> Send for WriteHandle<O, T, A>
44where
45    O: Apply<T, A>,
46    T: Send,
47    O: Send,
48    A: Send,
49    ReadHandle<T>: Send,
50{
51}
52
53impl<O, T, A> fmt::Debug for WriteHandle<O, T, A>
54where
55    O: Apply<T, A> + fmt::Debug,
56    O: fmt::Debug,
57    A: fmt::Debug,
58{
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("WriteHandle")
61            .field("epochs", &self.epochs)
62            .field("w_handle", &self.w_handle)
63            .field("oplog", &self.oplog)
64            .field("swap_index", &self.swap_index)
65            .field("r_handle", &self.r_handle)
66            .field("auxiliary", &self.auxiliary)
67            .finish()
68    }
69}
70
71impl<O, T, A> Drop for WriteHandle<O, T, A>
72where
73    O: Apply<T, A>,
74{
75    fn drop(&mut self) {
76        use std::ptr;
77        // first, ensure the read handle is up-to-date with all operations
78        if self.swap_index != self.oplog.len() {
79            self.publish();
80        }
81
82        // next, grab the read handle and set it to NULL
83        let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
84
85        // now, wait for all readers to depart
86        let epochs = Arc::clone(&self.epochs);
87        let mut epochs = epochs.lock().unwrap();
88        self.wait(&mut epochs);
89
90        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
91        fence(Ordering::SeqCst);
92
93        // all readers have now observed the NULL, so we own both handles.
94        // all operations have been applied to the r_handle.
95        //
96        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
97        drop(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
98
99        // next we take the r_handle and return it as a boxed value.
100        //
101        // this is safe, since we know that no readers are using this pointer
102        // anymore (due to the .wait() following swapping the pointer with NULL).
103        //
104        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
105        drop(unsafe { Box::from_raw(r_handle) });
106    }
107}
108
109impl<O, T, A> WriteHandle<O, T, A>
110where
111    O: Apply<T, A>,
112{
113    pub(crate) fn new(
114        w_handle: T,
115        epochs: crate::Epochs,
116        r_handle: ReadHandle<T>,
117        auxiliary: A,
118    ) -> Self {
119        Self {
120            epochs,
121            // safety: Box<T> is not null and covariant.
122            w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
123            oplog: VecDeque::new(),
124            swap_index: 0,
125            r_handle,
126            last_epochs: Vec::new(),
127            auxiliary,
128            #[cfg(test)]
129            is_waiting: Arc::new(AtomicBool::new(false)),
130            #[cfg(test)]
131            refreshes: 0,
132        }
133    }
134
135    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
136        let mut iter = 0;
137        let mut starti = 0;
138
139        #[cfg(test)]
140        {
141            self.is_waiting.store(true, Ordering::Relaxed);
142        }
143        // we're over-estimating here, but slab doesn't expose its max index
144        self.last_epochs.resize(epochs.capacity(), 0);
145        'retry: loop {
146            // read all and see if all have changed (which is likely)
147            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
148                // if the reader's epoch was even last we read it (which was _after_ the swap),
149                // then they either do not have the pointer, or must have read the pointer strictly
150                // after the swap. in either case, they cannot be using the old pointer value (what
151                // is now w_handle).
152                //
153                // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
154                // which is odd, and std::u{N}::MAX + 1 == 0 is even.
155                //
156                // note also that `ri` _may_ have been re-used since we last read into last_epochs.
157                // this is okay though, as a change still implies that the new reader must have
158                // arrived _after_ we did the atomic swap, and thus must also have seen the new
159                // pointer.
160                if self.last_epochs[ri] % 2 == 0 {
161                    continue;
162                }
163
164                let now = epoch.load(Ordering::Acquire);
165                if now != self.last_epochs[ri] {
166                    // reader must have seen the last swap, since they have done at least one
167                    // operation since we last looked at their epoch, which _must_ mean that they
168                    // are no longer using the old pointer value.
169                } else {
170                    // reader may not have seen swap
171                    // continue from this reader's epoch
172                    starti = ii;
173
174                    if !cfg!(loom) {
175                        // how eagerly should we retry?
176                        if iter != 20 {
177                            iter += 1;
178                        } else {
179                            thread::yield_now();
180                        }
181                    }
182
183                    #[cfg(loom)]
184                    loom::thread::yield_now();
185
186                    continue 'retry;
187                }
188            }
189            break;
190        }
191        #[cfg(test)]
192        {
193            self.is_waiting.store(false, Ordering::Relaxed);
194        }
195    }
196
197    /// Publish all operations append to the log to reads.
198    ///
199    /// This method needs to wait for all readers to move to the "other" copy of the data so that
200    /// it can replay the operational log onto the stale copy the readers used to use. This can
201    /// take some time, especially if readers are executing slow operations, or if there are many
202    /// of them.
203    pub fn publish(&mut self) -> &mut Self {
204        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
205        // flag has been observed to be on for two subsequent iterations (there still may be some
206        // readers present since we did the previous refresh)
207        //
208        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
209        // only block on pre-existing readers, and they are never waiting to push onto epochs
210        // unless they have finished reading.
211        let epochs = Arc::clone(&self.epochs);
212        let mut epochs = epochs.lock().unwrap();
213
214        self.wait(&mut epochs);
215
216        // all the readers have left!
217        // safety: we haven't freed the Box, and no readers are accessing the w_handle
218        let w_handle = unsafe { self.w_handle.as_mut() };
219
220        // safety: we will not swap while we hold this reference
221        let r_handle = unsafe {
222            self.r_handle
223                .inner
224                .load(Ordering::Acquire)
225                .as_ref()
226                .unwrap()
227        };
228
229        // the w_handle copy has not seen any of the writes in the oplog
230        // the r_handle copy has not seen any of the writes following swap_index
231        if self.swap_index != 0 {
232            // we can drain out the operations that only the w_handle copy needs
233            //
234            // NOTE: the if above is because drain(0..0) would remove 0
235            for op in self.oplog.drain(0..self.swap_index) {
236                O::apply_second(op, r_handle, w_handle, &mut self.auxiliary);
237            }
238        }
239        // we cannot give owned operations to apply_first
240        // since they'll also be needed by the r_handle copy
241        for op in self.oplog.iter_mut() {
242            O::apply_first(op, w_handle, r_handle, &mut self.auxiliary);
243        }
244        // the w_handle copy is about to become the r_handle, and can ignore the oplog
245        self.swap_index = self.oplog.len();
246        // w_handle (the old r_handle) is now fully up to date!
247
248        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
249        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
250        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
251        // r_handle.
252        //
253        // it's now time for us to swap the copies so that readers see up-to-date results from
254        // w_handle.
255
256        // swap in our w_handle, and get r_handle in return
257        let r_handle = self
258            .r_handle
259            .inner
260            .swap(self.w_handle.as_ptr(), Ordering::Release);
261
262        // NOTE: at this point, there are likely still readers using r_handle.
263        // safety: r_handle was also created from a Box, so it is not null and is covariant.
264        self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
265
266        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
267        fence(Ordering::SeqCst);
268
269        for (ri, epoch) in epochs.iter() {
270            self.last_epochs[ri] = epoch.load(Ordering::Acquire);
271        }
272
273        #[cfg(test)]
274        {
275            self.refreshes += 1;
276        }
277
278        self
279    }
280
281    /// Publish as necessary to ensure that all operations are visible to readers.
282    ///
283    /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
284    /// This method will only do so if there are pending operations.
285    pub fn flush(&mut self) {
286        if self.has_pending_operations() {
287            self.publish();
288        }
289    }
290
291    /// Returns true if there are operations in the operational log that have not yet been exposed
292    /// to readers.
293    pub fn has_pending_operations(&self) -> bool {
294        // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
295        // there are operations that have not yet been applied to the _write_ handle.
296        self.swap_index < self.oplog.len()
297    }
298
299    /// Append the given operation to the operational log.
300    ///
301    /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
302    pub fn append(&mut self, op: O) -> &mut Self {
303        self.extend(std::iter::once(op));
304        self
305    }
306
307    /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
308    ///
309    /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
310    /// readers still present in this copy. This is not normally something you know; even after
311    /// calling `publish`, readers may still be in the write copy for some time. In general, the
312    /// only time you know this is okay is before the first call to `publish` (since no readers
313    /// ever entered the write copy).
314    // TODO: Make this return `Option<&mut T>`,
315    // and only `Some` if there are indeed to readers in the write copy.
316    pub fn raw_write_handle(&mut self) -> NonNull<T> {
317        self.w_handle
318    }
319
320    /// Returns the backing data structure.
321    ///
322    /// Makes sure that all the pending operations are applied and waits till all the read handles
323    /// have departed. Then it drops one of the copies of the data and
324    /// returns the other copy in a Box.
325    pub fn take(self) -> Box<T> {
326        use std::mem;
327        use std::ptr;
328        // first, ensure the read handle is up-to-date with all operations
329        let mut this = mem::ManuallyDrop::new(self);
330        if this.swap_index != this.oplog.len() {
331            this.publish();
332        }
333
334        // next, grab the read handle and set it to NULL
335        let r_handle = this.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
336
337        // now, wait for all readers to depart
338        // we need to make sure that the lock is relesed before we drop the w_handle
339        // to prevent a deadlock if a reader tries to acquire the lock on drop
340        {
341            let epochs = Arc::clone(&this.epochs);
342            let mut epochs = epochs.lock().unwrap();
343            this.wait(&mut epochs);
344        }
345
346        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
347        fence(Ordering::SeqCst);
348
349        // all readers have now observed the NULL, so we own both handles.
350        // all operations have been applied to the r_handle.
351        //
352        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
353        drop(unsafe { Box::from_raw(this.w_handle.as_ptr()) });
354
355        // next we take the r_handle and return it as a boxed value.
356        //
357        // this is safe, since we know that no readers are using this pointer
358        // anymore (due to the .wait() following swapping the pointer with NULL).
359        //
360        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
361        let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
362
363        // drop the other fields
364        unsafe { ptr::drop_in_place(&mut this.epochs) };
365        unsafe { ptr::drop_in_place(&mut this.oplog) };
366        unsafe { ptr::drop_in_place(&mut this.r_handle) };
367        unsafe { ptr::drop_in_place(&mut this.last_epochs) };
368        #[cfg(test)]
369        unsafe {
370            ptr::drop_in_place(&mut this.is_waiting)
371        };
372
373        // return the boxed r_handle
374        boxed_r_handle
375    }
376}
377
378// allow using write handle for reads
379use std::ops::Deref;
380impl<O, T, A> Deref for WriteHandle<O, T, A>
381where
382    O: Apply<T, A>,
383{
384    type Target = ReadHandle<T>;
385    fn deref(&self) -> &Self::Target {
386        &self.r_handle
387    }
388}
389
390impl<O, T, A> Extend<O> for WriteHandle<O, T, A>
391where
392    O: Apply<T, A>,
393{
394    /// Add multiple operations to the operational log.
395    ///
396    /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
397    fn extend<I>(&mut self, ops: I)
398    where
399        I: IntoIterator<Item = O>,
400    {
401        self.oplog.extend(ops);
402    }
403}
404
405/// `WriteHandle` can be sent across thread boundaries:
406///
407/// ```
408/// use reft_light::WriteHandle;
409///
410/// struct Data;
411/// impl reft_light::Apply<Data, ()> for () {
412///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
413/// }
414///
415/// fn is_send<T: Send>() {
416///   // dummy function just used for its parameterized type bound
417/// }
418///
419/// is_send::<WriteHandle<(), Data, ()>>()
420/// ```
421///
422/// As long as the inner types allow that of course.
423/// Namely, the data type has to be `Send`:
424///
425/// ```compile_fail
426/// use reft_light::WriteHandle;
427/// use std::rc::Rc;
428///
429/// struct Data(Rc<()>);
430/// impl reft_light::Apply<Data, ()> for () {
431///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
432/// }
433///
434/// fn is_send<T: Send>() {
435///   // dummy function just used for its parameterized type bound
436/// }
437///
438/// is_send::<WriteHandle<(), Data, ()>>()
439/// ```
440///
441/// .. the operation type has to be `Send`:
442///
443/// ```compile_fail
444/// use reft_light::WriteHandle;
445/// use std::rc::Rc;
446///
447/// struct Data;
448/// impl reft_light::Apply<Data, ()> for Rc<()> {
449///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
450/// }
451///
452/// fn is_send<T: Send>() {
453///   // dummy function just used for its parameterized type bound
454/// }
455///
456/// is_send::<WriteHandle<Rc<()>, Data, ()>>()
457/// ```
458///
459/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
460///
461/// ```compile_fail
462/// use reft_light::WriteHandle;
463/// use std::cell::Cell;
464///
465/// struct Data(Cell<()>);
466/// impl reft_light::Apply<Data, ()> for () {
467///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
468/// }
469///
470/// fn is_send<T: Send>() {
471///   // dummy function just used for its parameterized type bound
472/// }
473///
474/// is_send::<WriteHandle<(), Data, ()>>()
475/// ```
476#[allow(dead_code)]
477struct CheckWriteHandleSend;
478
479#[cfg(test)]
480mod tests {
481    use crate::sync::{AtomicUsize, Mutex, Ordering};
482    use crate::Apply;
483    use slab::Slab;
484    include!("./utilities.rs");
485
486    #[test]
487    fn append_test() {
488        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
489        w.append(CounterAddOp(1));
490        assert_eq!(w.oplog.len(), 1);
491        w.publish();
492        w.append(CounterAddOp(2));
493        w.append(CounterAddOp(3));
494        assert_eq!(w.oplog.len(), 3);
495    }
496
497    #[test]
498    fn take_test() {
499        // publish twice then take with no pending operations
500        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
501        w.append(CounterAddOp(1));
502        w.publish();
503        w.append(CounterAddOp(1));
504        w.publish();
505        assert_eq!(*w.take(), 4);
506
507        // publish twice then pending operation published by take
508        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
509        w.append(CounterAddOp(1));
510        w.publish();
511        w.append(CounterAddOp(1));
512        w.publish();
513        w.append(CounterAddOp(2));
514        assert_eq!(*w.take(), 6);
515
516        // normal publish then pending operations published by take
517        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
518        w.append(CounterAddOp(1));
519        w.publish();
520        w.append(CounterAddOp(1));
521        assert_eq!(*w.take(), 4);
522
523        // pending operations published by take
524        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
525        w.append(CounterAddOp(1));
526        assert_eq!(*w.take(), 3);
527
528        // emptry op queue
529        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
530        w.append(CounterAddOp(1));
531        w.publish();
532        assert_eq!(*w.take(), 3);
533
534        // no operations
535        let w = crate::new::<CounterAddOp, _, _>(2, ());
536        assert_eq!(*w.take(), 2);
537    }
538
539    #[test]
540    fn wait_test() {
541        use std::sync::{Arc, Barrier};
542        use std::thread;
543        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
544
545        // Case 1: If epoch is set to default.
546        let test_epochs: crate::Epochs = Default::default();
547        let mut test_epochs = test_epochs.lock().unwrap();
548        // since there is no epoch to waiting for, wait function will return immediately.
549        w.wait(&mut test_epochs);
550
551        // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
552        // and wait has been called.
553        let held_epoch = Arc::new(AtomicUsize::new(1));
554
555        w.last_epochs = vec![2, 2, 1];
556        let mut epochs_slab = Slab::new();
557        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
558        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
559        epochs_slab.insert(Arc::clone(&held_epoch));
560
561        let barrier = Arc::new(Barrier::new(2));
562
563        let is_waiting = Arc::clone(&w.is_waiting);
564
565        // check writers waiting state before calling wait.
566        let is_waiting_v = is_waiting.load(Ordering::Relaxed);
567        assert_eq!(false, is_waiting_v);
568
569        let barrier2 = Arc::clone(&barrier);
570        let test_epochs = Arc::new(Mutex::new(epochs_slab));
571        let wait_handle = thread::spawn(move || {
572            barrier2.wait();
573            let mut test_epochs = test_epochs.lock().unwrap();
574            w.wait(&mut test_epochs);
575        });
576
577        barrier.wait();
578
579        // make sure that writer wait() will call first, only then allow to updates the held epoch.
580        while !is_waiting.load(Ordering::Relaxed) {
581            thread::yield_now();
582        }
583
584        held_epoch.fetch_add(1, Ordering::SeqCst);
585
586        // join to make sure that wait must return after the progress/increment
587        // of held_epoch.
588        let _ = wait_handle.join();
589    }
590
591    #[test]
592    fn flush_noblock() {
593        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
594        let r = w.clone();
595        w.append(CounterAddOp(42));
596        w.publish();
597        assert_eq!(*r.enter().unwrap(), 42);
598
599        // pin the epoch
600        let _count = r.enter();
601        // refresh would hang here
602        assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
603        assert!(!w.has_pending_operations());
604    }
605
606    #[test]
607    fn flush_no_refresh() {
608        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
609
610        // Until we refresh, writes are written directly instead of going to the
611        // oplog (because there can't be any readers on the w_handle table).
612        assert!(!w.has_pending_operations());
613        w.publish();
614        assert!(!w.has_pending_operations());
615        assert_eq!(w.refreshes, 1);
616
617        w.append(CounterAddOp(42));
618        assert!(w.has_pending_operations());
619        w.publish();
620        assert!(!w.has_pending_operations());
621        assert_eq!(w.refreshes, 2);
622
623        w.append(CounterAddOp(42));
624        assert!(w.has_pending_operations());
625        w.publish();
626        assert!(!w.has_pending_operations());
627        assert_eq!(w.refreshes, 3);
628
629        // Sanity check that a refresh would have been visible
630        assert!(!w.has_pending_operations());
631        w.publish();
632        assert_eq!(w.refreshes, 4);
633    }
634}