reft_light/
write.rs

1use crate::read::ReadHandle;
2use crate::Apply;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::ptr::NonNull;
7#[cfg(test)]
8use std::sync::atomic::AtomicBool;
9use std::{fmt, thread};
10
11/// A writer handle to a left-right guarded data structure.
12///
13/// All operations on the underlying data should be enqueued as operations of type `O` using
14/// [`append`](Self::append). The effect of this operations are only exposed to readers once
15/// [`publish`](Self::publish) is called.
16///
17/// # Reading through a `WriteHandle`
18///
19/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
20/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
21/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
22/// operations prior to the last call to [`publish`](Self::publish).
23pub struct WriteHandle<O, T, A>
24where
25    O: Apply<T, A>,
26{
27    epochs: crate::Epochs,
28    w_handle: NonNull<T>,
29    oplog: VecDeque<O>,
30    swap_index: usize,
31    r_handle: ReadHandle<T>,
32    last_epochs: Vec<usize>,
33    auxiliary: A,
34    #[cfg(test)]
35    refreshes: usize,
36    #[cfg(test)]
37    is_waiting: Arc<AtomicBool>,
38}
39
40// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
41// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
42// `ReadHandle`, we also need to respect its Send requirements.
43unsafe impl<O, T, A> Send for WriteHandle<O, T, A>
44where
45    O: Apply<T, A>,
46    T: Send,
47    O: Send,
48    A: Send,
49    ReadHandle<T>: Send,
50{
51}
52
53impl<O, T, A> fmt::Debug for WriteHandle<O, T, A>
54where
55    O: Apply<T, A> + fmt::Debug,
56    O: fmt::Debug,
57    A: fmt::Debug,
58{
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.debug_struct("WriteHandle")
61            .field("epochs", &self.epochs)
62            .field("w_handle", &self.w_handle)
63            .field("oplog", &self.oplog)
64            .field("swap_index", &self.swap_index)
65            .field("r_handle", &self.r_handle)
66            .field("auxiliary", &self.auxiliary)
67            .finish()
68    }
69}
70
71impl<O, T, A> Drop for WriteHandle<O, T, A>
72where
73    O: Apply<T, A>,
74{
75    fn drop(&mut self) {
76        use std::ptr;
77        // first, ensure the read handle is up-to-date with all operations
78        if self.swap_index != self.oplog.len() {
79            self.publish();
80        }
81
82        // next, grab the read handle and set it to NULL
83        let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
84
85        // now, wait for all readers to depart
86        let epochs = Arc::clone(&self.epochs);
87        let mut epochs = epochs.lock().unwrap();
88        self.wait(&mut epochs);
89
90        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
91        fence(Ordering::SeqCst);
92
93        // all readers have now observed the NULL, so we own both handles.
94        // all operations have been applied to the r_handle.
95        //
96        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
97        drop(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
98
99        // next we take the r_handle and return it as a boxed value.
100        //
101        // this is safe, since we know that no readers are using this pointer
102        // anymore (due to the .wait() following swapping the pointer with NULL).
103        //
104        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
105        drop(unsafe { Box::from_raw(r_handle) });
106    }
107}
108
109impl<O, T, A> WriteHandle<O, T, A>
110where
111    O: Apply<T, A>,
112{
113    pub(crate) fn new(
114        w_handle: T,
115        epochs: crate::Epochs,
116        r_handle: ReadHandle<T>,
117        auxiliary: A,
118    ) -> Self {
119        Self {
120            epochs,
121            // safety: Box<T> is not null and covariant.
122            w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
123            oplog: VecDeque::new(),
124            swap_index: 0,
125            r_handle,
126            last_epochs: Vec::new(),
127            auxiliary,
128            #[cfg(test)]
129            is_waiting: Arc::new(AtomicBool::new(false)),
130            #[cfg(test)]
131            refreshes: 0,
132        }
133    }
134
135    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
136        let mut iter = 0;
137        let mut starti = 0;
138
139        #[cfg(test)]
140        {
141            self.is_waiting.store(true, Ordering::Relaxed);
142        }
143        // we're over-estimating here, but slab doesn't expose its max index
144        self.last_epochs.resize(epochs.capacity(), 0);
145        'retry: loop {
146            // read all and see if all have changed (which is likely)
147            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
148                // if the reader's epoch was even last we read it (which was _after_ the swap),
149                // then they either do not have the pointer, or must have read the pointer strictly
150                // after the swap. in either case, they cannot be using the old pointer value (what
151                // is now w_handle).
152                //
153                // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
154                // which is odd, and std::u{N}::MAX + 1 == 0 is even.
155                //
156                // note also that `ri` _may_ have been re-used since we last read into last_epochs.
157                // this is okay though, as a change still implies that the new reader must have
158                // arrived _after_ we did the atomic swap, and thus must also have seen the new
159                // pointer.
160                if self.last_epochs[ri] % 2 == 0 {
161                    continue;
162                }
163
164                let now = epoch.load(Ordering::Acquire);
165                if now != self.last_epochs[ri] {
166                    // reader must have seen the last swap, since they have done at least one
167                    // operation since we last looked at their epoch, which _must_ mean that they
168                    // are no longer using the old pointer value.
169                } else {
170                    // reader may not have seen swap
171                    // continue from this reader's epoch
172                    starti = ii;
173
174                    if !cfg!(loom) {
175                        // how eagerly should we retry?
176                        if iter != 20 {
177                            iter += 1;
178                        } else {
179                            thread::yield_now();
180                        }
181                    }
182
183                    #[cfg(loom)]
184                    loom::thread::yield_now();
185
186                    continue 'retry;
187                }
188            }
189            break;
190        }
191        #[cfg(test)]
192        {
193            self.is_waiting.store(false, Ordering::Relaxed);
194        }
195    }
196
197    /// Publish all operations append to the log to reads.
198    ///
199    /// This method needs to wait for all readers to move to the "other" copy of the data so that
200    /// it can replay the operational log onto the stale copy the readers used to use. This can
201    /// take some time, especially if readers are executing slow operations, or if there are many
202    /// of them.
203    pub fn publish(&mut self) -> &mut Self {
204        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
205        // flag has been observed to be on for two subsequent iterations (there still may be some
206        // readers present since we did the previous refresh)
207        //
208        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
209        // only block on pre-existing readers, and they are never waiting to push onto epochs
210        // unless they have finished reading.
211        let epochs = Arc::clone(&self.epochs);
212        let mut epochs = epochs.lock().unwrap();
213
214        self.wait(&mut epochs);
215
216        // all the readers have left!
217        // safety: we haven't freed the Box, and no readers are accessing the w_handle
218        let w_handle = unsafe { self.w_handle.as_mut() };
219
220        // safety: we will not swap while we hold this reference
221        let r_handle = unsafe {
222            self.r_handle
223                .inner
224                .load(Ordering::Acquire)
225                .as_ref()
226                .unwrap()
227        };
228
229        // the w_handle copy has not seen any of the writes in the oplog
230        // the r_handle copy has not seen any of the writes following swap_index
231        if self.swap_index != 0 {
232            // we can drain out the operations that only the w_handle copy needs
233            //
234            // NOTE: the if above is because drain(0..0) would remove 0
235            for op in self.oplog.drain(0..self.swap_index) {
236                O::apply_second(op, r_handle, w_handle, &mut self.auxiliary);
237            }
238        }
239        // we cannot give owned operations to apply_first
240        // since they'll also be needed by the r_handle copy
241        for op in self.oplog.iter_mut() {
242            O::apply_first(op, w_handle, r_handle, &mut self.auxiliary);
243        }
244        // the w_handle copy is about to become the r_handle, and can ignore the oplog
245        self.swap_index = self.oplog.len();
246        // w_handle (the old r_handle) is now fully up to date!
247
248        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
249        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
250        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
251        // r_handle.
252        //
253        // it's now time for us to swap the copies so that readers see up-to-date results from
254        // w_handle.
255
256        // swap in our w_handle, and get r_handle in return
257        let r_handle = self
258            .r_handle
259            .inner
260            .swap(self.w_handle.as_ptr(), Ordering::Release);
261
262        // NOTE: at this point, there are likely still readers using r_handle.
263        // safety: r_handle was also created from a Box, so it is not null and is covariant.
264        self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
265
266        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
267        fence(Ordering::SeqCst);
268
269        for (ri, epoch) in epochs.iter() {
270            self.last_epochs[ri] = epoch.load(Ordering::Acquire);
271        }
272
273        #[cfg(test)]
274        {
275            self.refreshes += 1;
276        }
277
278        self
279    }
280
281    /// Publish as necessary to ensure that all operations are visible to readers.
282    ///
283    /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
284    /// This method will only do so if there are pending operations.
285    pub fn flush(&mut self) {
286        if self.has_pending_operations() {
287            self.publish();
288        }
289    }
290
291    /// Returns true if there are operations in the operational log that have not yet been exposed
292    /// to readers.
293    pub fn has_pending_operations(&self) -> bool {
294        // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
295        // there are operations that have not yet been applied to the _write_ handle.
296        self.swap_index < self.oplog.len()
297    }
298
299    /// Append the given operation to the operational log.
300    ///
301    /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
302    pub fn append(&mut self, op: O) -> &mut Self {
303        self.extend(std::iter::once(op));
304        self
305    }
306
307    /// Returns a reference to the auxiliary data.
308    pub fn auxiliary(&self) -> &A {
309        &self.auxiliary
310    }
311
312    /// Returns a mutable reference to the auxiliary data structure.
313    pub fn auxiliary_mut(&mut self) -> &mut A {
314        &mut self.auxiliary
315    }
316
317    /// Returns the backing data structure.
318    ///
319    /// Makes sure that all the pending operations are applied and waits till all the read handles
320    /// have departed. Then it drops one of the copies of the data and
321    /// returns the other copy in a Box.
322    pub fn take(self) -> Box<T> {
323        use std::mem;
324        use std::ptr;
325        // first, ensure the read handle is up-to-date with all operations
326        let mut this = mem::ManuallyDrop::new(self);
327        if this.swap_index != this.oplog.len() {
328            this.publish();
329        }
330
331        // next, grab the read handle and set it to NULL
332        let r_handle = this.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
333
334        // now, wait for all readers to depart
335        // we need to make sure that the lock is relesed before we drop the w_handle
336        // to prevent a deadlock if a reader tries to acquire the lock on drop
337        {
338            let epochs = Arc::clone(&this.epochs);
339            let mut epochs = epochs.lock().unwrap();
340            this.wait(&mut epochs);
341        }
342
343        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
344        fence(Ordering::SeqCst);
345
346        // all readers have now observed the NULL, so we own both handles.
347        // all operations have been applied to the r_handle.
348        //
349        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
350        drop(unsafe { Box::from_raw(this.w_handle.as_ptr()) });
351
352        // next we take the r_handle and return it as a boxed value.
353        //
354        // this is safe, since we know that no readers are using this pointer
355        // anymore (due to the .wait() following swapping the pointer with NULL).
356        //
357        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
358        let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
359
360        // drop the other fields
361        unsafe { ptr::drop_in_place(&mut this.epochs) };
362        unsafe { ptr::drop_in_place(&mut this.oplog) };
363        unsafe { ptr::drop_in_place(&mut this.r_handle) };
364        unsafe { ptr::drop_in_place(&mut this.last_epochs) };
365        #[cfg(test)]
366        unsafe {
367            ptr::drop_in_place(&mut this.is_waiting)
368        };
369
370        // return the boxed r_handle
371        boxed_r_handle
372    }
373}
374
375// allow using write handle for reads
376use std::ops::Deref;
377impl<O, T, A> Deref for WriteHandle<O, T, A>
378where
379    O: Apply<T, A>,
380{
381    type Target = ReadHandle<T>;
382    fn deref(&self) -> &Self::Target {
383        &self.r_handle
384    }
385}
386
387impl<O, T, A> Extend<O> for WriteHandle<O, T, A>
388where
389    O: Apply<T, A>,
390{
391    /// Add multiple operations to the operational log.
392    ///
393    /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
394    fn extend<I>(&mut self, ops: I)
395    where
396        I: IntoIterator<Item = O>,
397    {
398        self.oplog.extend(ops);
399    }
400}
401
402/// `WriteHandle` can be sent across thread boundaries:
403///
404/// ```
405/// use reft_light::WriteHandle;
406///
407/// struct Data;
408/// impl reft_light::Apply<Data, ()> for () {
409///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
410/// }
411///
412/// fn is_send<T: Send>() {
413///   // dummy function just used for its parameterized type bound
414/// }
415///
416/// is_send::<WriteHandle<(), Data, ()>>()
417/// ```
418///
419/// As long as the inner types allow that of course.
420/// Namely, the data type has to be `Send`:
421///
422/// ```compile_fail
423/// use reft_light::WriteHandle;
424/// use std::rc::Rc;
425///
426/// struct Data(Rc<()>);
427/// impl reft_light::Apply<Data, ()> for () {
428///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
429/// }
430///
431/// fn is_send<T: Send>() {
432///   // dummy function just used for its parameterized type bound
433/// }
434///
435/// is_send::<WriteHandle<(), Data, ()>>()
436/// ```
437///
438/// .. the operation type has to be `Send`:
439///
440/// ```compile_fail
441/// use reft_light::WriteHandle;
442/// use std::rc::Rc;
443///
444/// struct Data;
445/// impl reft_light::Apply<Data, ()> for Rc<()> {
446///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
447/// }
448///
449/// fn is_send<T: Send>() {
450///   // dummy function just used for its parameterized type bound
451/// }
452///
453/// is_send::<WriteHandle<Rc<()>, Data, ()>>()
454/// ```
455///
456/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
457///
458/// ```compile_fail
459/// use reft_light::WriteHandle;
460/// use std::cell::Cell;
461///
462/// struct Data(Cell<()>);
463/// impl reft_light::Apply<Data, ()> for () {
464///     fn apply_first(&mut self, _: &mut Data, _: &Data, _: &mut ()) {}
465/// }
466///
467/// fn is_send<T: Send>() {
468///   // dummy function just used for its parameterized type bound
469/// }
470///
471/// is_send::<WriteHandle<(), Data, ()>>()
472/// ```
473#[allow(dead_code)]
474struct CheckWriteHandleSend;
475
476#[cfg(test)]
477mod tests {
478    use crate::sync::{AtomicUsize, Mutex, Ordering};
479    use crate::Apply;
480    use slab::Slab;
481    include!("./utilities.rs");
482
483    #[test]
484    fn append_test() {
485        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
486        w.append(CounterAddOp(1));
487        assert_eq!(w.oplog.len(), 1);
488        w.publish();
489        w.append(CounterAddOp(2));
490        w.append(CounterAddOp(3));
491        assert_eq!(w.oplog.len(), 3);
492    }
493
494    #[test]
495    fn take_test() {
496        // publish twice then take with no pending operations
497        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
498        w.append(CounterAddOp(1));
499        w.publish();
500        w.append(CounterAddOp(1));
501        w.publish();
502        assert_eq!(*w.take(), 4);
503
504        // publish twice then pending operation published by take
505        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
506        w.append(CounterAddOp(1));
507        w.publish();
508        w.append(CounterAddOp(1));
509        w.publish();
510        w.append(CounterAddOp(2));
511        assert_eq!(*w.take(), 6);
512
513        // normal publish then pending operations published by take
514        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
515        w.append(CounterAddOp(1));
516        w.publish();
517        w.append(CounterAddOp(1));
518        assert_eq!(*w.take(), 4);
519
520        // pending operations published by take
521        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
522        w.append(CounterAddOp(1));
523        assert_eq!(*w.take(), 3);
524
525        // emptry op queue
526        let mut w = crate::new::<CounterAddOp, _, _>(2, ());
527        w.append(CounterAddOp(1));
528        w.publish();
529        assert_eq!(*w.take(), 3);
530
531        // no operations
532        let w = crate::new::<CounterAddOp, _, _>(2, ());
533        assert_eq!(*w.take(), 2);
534    }
535
536    #[test]
537    fn wait_test() {
538        use std::sync::{Arc, Barrier};
539        use std::thread;
540        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
541
542        // Case 1: If epoch is set to default.
543        let test_epochs: crate::Epochs = Default::default();
544        let mut test_epochs = test_epochs.lock().unwrap();
545        // since there is no epoch to waiting for, wait function will return immediately.
546        w.wait(&mut test_epochs);
547
548        // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
549        // and wait has been called.
550        let held_epoch = Arc::new(AtomicUsize::new(1));
551
552        w.last_epochs = vec![2, 2, 1];
553        let mut epochs_slab = Slab::new();
554        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
555        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
556        epochs_slab.insert(Arc::clone(&held_epoch));
557
558        let barrier = Arc::new(Barrier::new(2));
559
560        let is_waiting = Arc::clone(&w.is_waiting);
561
562        // check writers waiting state before calling wait.
563        let is_waiting_v = is_waiting.load(Ordering::Relaxed);
564        assert_eq!(false, is_waiting_v);
565
566        let barrier2 = Arc::clone(&barrier);
567        let test_epochs = Arc::new(Mutex::new(epochs_slab));
568        let wait_handle = thread::spawn(move || {
569            barrier2.wait();
570            let mut test_epochs = test_epochs.lock().unwrap();
571            w.wait(&mut test_epochs);
572        });
573
574        barrier.wait();
575
576        // make sure that writer wait() will call first, only then allow to updates the held epoch.
577        while !is_waiting.load(Ordering::Relaxed) {
578            thread::yield_now();
579        }
580
581        held_epoch.fetch_add(1, Ordering::SeqCst);
582
583        // join to make sure that wait must return after the progress/increment
584        // of held_epoch.
585        let _ = wait_handle.join();
586    }
587
588    #[test]
589    fn flush_noblock() {
590        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
591        let r = w.clone();
592        w.append(CounterAddOp(42));
593        w.publish();
594        assert_eq!(*r.enter().unwrap(), 42);
595
596        // pin the epoch
597        let _count = r.enter();
598        // refresh would hang here
599        assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
600        assert!(!w.has_pending_operations());
601    }
602
603    #[test]
604    fn flush_no_refresh() {
605        let mut w = crate::new::<CounterAddOp, _, _>(0, ());
606
607        // Until we refresh, writes are written directly instead of going to the
608        // oplog (because there can't be any readers on the w_handle table).
609        assert!(!w.has_pending_operations());
610        w.publish();
611        assert!(!w.has_pending_operations());
612        assert_eq!(w.refreshes, 1);
613
614        w.append(CounterAddOp(42));
615        assert!(w.has_pending_operations());
616        w.publish();
617        assert!(!w.has_pending_operations());
618        assert_eq!(w.refreshes, 2);
619
620        w.append(CounterAddOp(42));
621        assert!(w.has_pending_operations());
622        w.publish();
623        assert!(!w.has_pending_operations());
624        assert_eq!(w.refreshes, 3);
625
626        // Sanity check that a refresh would have been visible
627        assert!(!w.has_pending_operations());
628        w.publish();
629        assert_eq!(w.refreshes, 4);
630    }
631}