left_right/
write.rs

1use crate::read::ReadHandle;
2use crate::Absorb;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9#[cfg(test)]
10use std::sync::atomic::AtomicBool;
11use std::{fmt, thread};
12
13/// A writer handle to a left-right guarded data structure.
14///
15/// All operations on the underlying data should be enqueued as operations of type `O` using
16/// [`append`](Self::append). The effect of this operations are only exposed to readers once
17/// [`publish`](Self::publish) is called.
18///
19/// # Reading through a `WriteHandle`
20///
21/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
22/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
23/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
24/// operations prior to the last call to [`publish`](Self::publish).
25pub struct WriteHandle<T, O>
26where
27    T: Absorb<O>,
28{
29    epochs: crate::Epochs,
30    w_handle: NonNull<T>,
31    oplog: VecDeque<O>,
32    swap_index: usize,
33    r_handle: ReadHandle<T>,
34    last_epochs: Vec<usize>,
35    #[cfg(test)]
36    refreshes: usize,
37    #[cfg(test)]
38    is_waiting: Arc<AtomicBool>,
39    /// Write directly to the write handle map, since no publish has happened.
40    first: bool,
41    /// A publish has happened, but the two copies have not been synchronized yet.
42    second: bool,
43    /// If we call `Self::take` the drop needs to be different.
44    taken: bool,
45}
46
47// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
48// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
49// `ReadHandle`, we also need to respect its Send requirements.
50unsafe impl<T, O> Send for WriteHandle<T, O>
51where
52    T: Absorb<O>,
53    T: Send,
54    O: Send,
55    ReadHandle<T>: Send,
56{
57}
58
59impl<T, O> fmt::Debug for WriteHandle<T, O>
60where
61    T: Absorb<O> + fmt::Debug,
62    O: fmt::Debug,
63{
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("WriteHandle")
66            .field("epochs", &self.epochs)
67            .field("w_handle", &self.w_handle)
68            .field("oplog", &self.oplog)
69            .field("swap_index", &self.swap_index)
70            .field("r_handle", &self.r_handle)
71            .field("first", &self.first)
72            .field("second", &self.second)
73            .finish()
74    }
75}
76
77/// A **smart pointer** to an owned backing data structure. This makes sure that the
78/// data is dropped correctly (using [`Absorb::drop_second`]).
79///
80/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
81pub struct Taken<T: Absorb<O>, O> {
82    inner: Option<Box<T>>,
83    _marker: PhantomData<O>,
84}
85
86impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("Taken")
89            .field(
90                "inner",
91                self.inner
92                    .as_ref()
93                    .expect("inner is only taken in `into_box` which drops self"),
94            )
95            .finish()
96    }
97}
98
99impl<T: Absorb<O>, O> Deref for Taken<T, O> {
100    type Target = T;
101
102    fn deref(&self) -> &Self::Target {
103        self.inner
104            .as_ref()
105            .expect("inner is only taken in `into_box` which drops self")
106    }
107}
108
109impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
110    fn deref_mut(&mut self) -> &mut Self::Target {
111        self.inner
112            .as_mut()
113            .expect("inner is only taken in `into_box` which drops self")
114    }
115}
116
117impl<T: Absorb<O>, O> Taken<T, O> {
118    /// # Safety
119    ///
120    /// You must call [`Absorb::drop_second`] in case just dropping `T` is not safe and sufficient.
121    ///
122    /// If you used the default implementation of [`Absorb::drop_second`] (which just calls
123    /// [`drop`](Drop::drop)) you don't need to call [`Absorb::drop_second`].
124    pub unsafe fn into_box(mut self) -> Box<T> {
125        self.inner
126            .take()
127            .expect("inner is only taken here then self is dropped")
128    }
129}
130
131impl<T: Absorb<O>, O> Drop for Taken<T, O> {
132    fn drop(&mut self) {
133        if let Some(inner) = self.inner.take() {
134            T::drop_second(inner);
135        }
136    }
137}
138
139impl<T, O> WriteHandle<T, O>
140where
141    T: Absorb<O>,
142{
143    /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
144    ///
145    /// Makes sure that all the pending operations are applied and waits till all the read handles
146    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
147    /// returns the other copy as a [`Taken`] smart pointer.
148    fn take_inner(&mut self) -> Option<Taken<T, O>> {
149        use std::ptr;
150        // Can only take inner once.
151        if self.taken {
152            return None;
153        }
154
155        // Disallow taking again.
156        self.taken = true;
157
158        // first, ensure both copies are up to date
159        // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
160        if self.first || !self.oplog.is_empty() {
161            self.publish();
162        }
163        if !self.oplog.is_empty() {
164            self.publish();
165        }
166        assert!(self.oplog.is_empty());
167
168        // next, grab the read handle and set it to NULL
169        let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
170
171        // now, wait for all readers to depart
172        let epochs = Arc::clone(&self.epochs);
173        let mut epochs = epochs.lock().unwrap();
174        self.wait(&mut epochs);
175
176        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
177        fence(Ordering::SeqCst);
178
179        // all readers have now observed the NULL, so we own both handles.
180        // all operations have been applied to both w_handle and r_handle.
181        // give the underlying data structure an opportunity to handle the one copy differently:
182        //
183        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
184        Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
185
186        // next we take the r_handle and return it as a boxed value.
187        //
188        // this is safe, since we know that no readers are using this pointer
189        // anymore (due to the .wait() following swapping the pointer with NULL).
190        //
191        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
192        let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
193
194        Some(Taken {
195            inner: Some(boxed_r_handle),
196            _marker: PhantomData,
197        })
198    }
199}
200
201impl<T, O> Drop for WriteHandle<T, O>
202where
203    T: Absorb<O>,
204{
205    fn drop(&mut self) {
206        if let Some(inner) = self.take_inner() {
207            drop(inner);
208        }
209    }
210}
211
212impl<T, O> WriteHandle<T, O>
213where
214    T: Absorb<O>,
215{
216    pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
217        Self {
218            epochs,
219            // safety: Box<T> is not null and covariant.
220            w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
221            oplog: VecDeque::new(),
222            swap_index: 0,
223            r_handle,
224            last_epochs: Vec::new(),
225            #[cfg(test)]
226            is_waiting: Arc::new(AtomicBool::new(false)),
227            #[cfg(test)]
228            refreshes: 0,
229            first: true,
230            second: true,
231            taken: false,
232        }
233    }
234
235    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
236        let mut iter = 0;
237        let mut starti = 0;
238
239        #[cfg(test)]
240        {
241            self.is_waiting.store(true, Ordering::Relaxed);
242        }
243        // we're over-estimating here, but slab doesn't expose its max index
244        self.last_epochs.resize(epochs.capacity(), 0);
245        'retry: loop {
246            // read all and see if all have changed (which is likely)
247            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
248                // if the reader's epoch was even last we read it (which was _after_ the swap),
249                // then they either do not have the pointer, or must have read the pointer strictly
250                // after the swap. in either case, they cannot be using the old pointer value (what
251                // is now w_handle).
252                //
253                // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
254                // which is odd, and std::u{N}::MAX + 1 == 0 is even.
255                //
256                // note also that `ri` _may_ have been re-used since we last read into last_epochs.
257                // this is okay though, as a change still implies that the new reader must have
258                // arrived _after_ we did the atomic swap, and thus must also have seen the new
259                // pointer.
260                if self.last_epochs[ri] % 2 == 0 {
261                    continue;
262                }
263
264                let now = epoch.load(Ordering::Acquire);
265                if now != self.last_epochs[ri] {
266                    // reader must have seen the last swap, since they have done at least one
267                    // operation since we last looked at their epoch, which _must_ mean that they
268                    // are no longer using the old pointer value.
269                } else {
270                    // reader may not have seen swap
271                    // continue from this reader's epoch
272                    starti = ii;
273
274                    if !cfg!(loom) {
275                        // how eagerly should we retry?
276                        if iter != 20 {
277                            iter += 1;
278                        } else {
279                            thread::yield_now();
280                        }
281                    }
282
283                    #[cfg(loom)]
284                    loom::thread::yield_now();
285
286                    continue 'retry;
287                }
288            }
289            break;
290        }
291        #[cfg(test)]
292        {
293            self.is_waiting.store(false, Ordering::Relaxed);
294        }
295    }
296
297    /// Try to publish once without waiting.
298    ///
299    /// This performs a single, non-blocking check of reader epochs. If all current readers have
300    /// advanced since the last swap, it performs a publish and returns `true`. If any reader may
301    /// still be accessing the old copy, it does nothing and returns `false`.
302    ///
303    /// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
304    /// paths where skipping a publish is preferable to blocking; call again later or fall back to
305    /// [`publish`](Self::publish) if you must ensure visibility.
306    ///
307    /// Returns `true` if a publish occurred, `false` otherwise.
308    pub fn try_publish(&mut self) -> bool {
309        let epochs = Arc::clone(&self.epochs);
310        let mut epochs = epochs.lock().unwrap();
311
312        // This wait loop is exactly like the one in wait, except that if we find a reader that
313        // has not observed the latest swap, we return rather than spin-and-retry.
314        self.last_epochs.resize(epochs.capacity(), 0);
315        for (ri, epoch) in epochs.iter() {
316            if self.last_epochs[ri] % 2 == 0 {
317                continue;
318            }
319
320            let now = epoch.load(Ordering::Acquire);
321            if now != self.last_epochs[ri] {
322                continue;
323            } else {
324                return false;
325            }
326        }
327        #[cfg(test)]
328        {
329            self.is_waiting.store(false, Ordering::Relaxed);
330        }
331        self.update_and_swap(&mut epochs);
332
333        true
334    }
335
336    /// Publish all operations append to the log to reads.
337    ///
338    /// This method needs to wait for all readers to move to the "other" copy of the data so that
339    /// it can replay the operational log onto the stale copy the readers used to use. This can
340    /// take some time, especially if readers are executing slow operations, or if there are many
341    /// of them.
342    pub fn publish(&mut self) -> &mut Self {
343        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
344        // flag has been observed to be on for two subsequent iterations (there still may be some
345        // readers present since we did the previous refresh)
346        //
347        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
348        // only block on pre-existing readers, and they are never waiting to push onto epochs
349        // unless they have finished reading.
350        let epochs = Arc::clone(&self.epochs);
351        let mut epochs = epochs.lock().unwrap();
352
353        self.wait(&mut epochs);
354
355        self.update_and_swap(&mut epochs)
356    }
357
358    /// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
359    ///
360    /// This method must only be called when all readers have exited `w_handle` (e.g., after
361    /// `wait`).
362    fn update_and_swap(
363        &mut self,
364        epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
365    ) -> &mut Self {
366        if !self.first {
367            // all the readers have left!
368            // safety: we haven't freed the Box, and no readers are accessing the w_handle
369            let w_handle = unsafe { self.w_handle.as_mut() };
370
371            // safety: we will not swap while we hold this reference
372            let r_handle = unsafe {
373                self.r_handle
374                    .inner
375                    .load(Ordering::Acquire)
376                    .as_ref()
377                    .unwrap()
378            };
379
380            if self.second {
381                Absorb::sync_with(w_handle, r_handle);
382                self.second = false
383            }
384
385            // the w_handle copy has not seen any of the writes in the oplog
386            // the r_handle copy has not seen any of the writes following swap_index
387            if self.swap_index != 0 {
388                // we can drain out the operations that only the w_handle copy needs
389                //
390                // NOTE: the if above is because drain(0..0) would remove 0
391                for op in self.oplog.drain(0..self.swap_index) {
392                    T::absorb_second(w_handle, op, r_handle);
393                }
394            }
395            // we cannot give owned operations to absorb_first
396            // since they'll also be needed by the r_handle copy
397            for op in self.oplog.iter_mut() {
398                T::absorb_first(w_handle, op, r_handle);
399            }
400            // the w_handle copy is about to become the r_handle, and can ignore the oplog
401            self.swap_index = self.oplog.len();
402
403        // w_handle (the old r_handle) is now fully up to date!
404        } else {
405            self.first = false
406        }
407
408        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
409        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
410        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
411        // r_handle.
412        //
413        // it's now time for us to swap the copies so that readers see up-to-date results from
414        // w_handle.
415
416        // swap in our w_handle, and get r_handle in return
417        let r_handle = self
418            .r_handle
419            .inner
420            .swap(self.w_handle.as_ptr(), Ordering::Release);
421
422        // NOTE: at this point, there are likely still readers using r_handle.
423        // safety: r_handle was also created from a Box, so it is not null and is covariant.
424        self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
425
426        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
427        fence(Ordering::SeqCst);
428
429        for (ri, epoch) in epochs.iter() {
430            self.last_epochs[ri] = epoch.load(Ordering::Acquire);
431        }
432
433        #[cfg(test)]
434        {
435            self.refreshes += 1;
436        }
437
438        self
439    }
440
441    /// Publish as necessary to ensure that all operations are visible to readers.
442    ///
443    /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
444    /// This method will only do so if there are pending operations.
445    pub fn flush(&mut self) {
446        if self.has_pending_operations() {
447            self.publish();
448        }
449    }
450
451    /// Returns true if there are operations in the operational log that have not yet been exposed
452    /// to readers.
453    pub fn has_pending_operations(&self) -> bool {
454        // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
455        // there are operations that have not yet been applied to the _write_ handle.
456        self.swap_index < self.oplog.len()
457    }
458
459    /// Append the given operation to the operational log.
460    ///
461    /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
462    pub fn append(&mut self, op: O) -> &mut Self {
463        self.extend(std::iter::once(op));
464        self
465    }
466
467    /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
468    ///
469    /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
470    /// readers still present in this copy. This is not normally something you know; even after
471    /// calling `publish`, readers may still be in the write copy for some time. In general, the
472    /// only time you know this is okay is before the first call to `publish` (since no readers
473    /// ever entered the write copy).
474    // TODO: Make this return `Option<&mut T>`,
475    // and only `Some` if there are indeed to readers in the write copy.
476    pub fn raw_write_handle(&mut self) -> NonNull<T> {
477        self.w_handle
478    }
479
480    /// Returns the backing data structure.
481    ///
482    /// Makes sure that all the pending operations are applied and waits till all the read handles
483    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
484    /// returns the other copy as a [`Taken`] smart pointer.
485    pub fn take(mut self) -> Taken<T, O> {
486        // It is always safe to `expect` here because `take_inner` is private
487        // and it is only called here and in the drop impl. Since we have an owned
488        // `self` we know the drop has not yet been called. And every first call of
489        // `take_inner` returns `Some`
490        self.take_inner()
491            .expect("inner is only taken here then self is dropped")
492    }
493}
494
495// allow using write handle for reads
496use std::ops::Deref;
497impl<T, O> Deref for WriteHandle<T, O>
498where
499    T: Absorb<O>,
500{
501    type Target = ReadHandle<T>;
502    fn deref(&self) -> &Self::Target {
503        &self.r_handle
504    }
505}
506
507impl<T, O> Extend<O> for WriteHandle<T, O>
508where
509    T: Absorb<O>,
510{
511    /// Add multiple operations to the operational log.
512    ///
513    /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
514    fn extend<I>(&mut self, ops: I)
515    where
516        I: IntoIterator<Item = O>,
517    {
518        if self.first {
519            // Safety: we know there are no outstanding w_handle readers, since we haven't
520            // refreshed ever before, so we can modify it directly!
521            let mut w_inner = self.raw_write_handle();
522            let w_inner = unsafe { w_inner.as_mut() };
523            let r_handle = self.enter().expect("map has not yet been destroyed");
524            // Because we are operating directly on the map, and nothing is aliased, we do want
525            // to perform drops, so we invoke absorb_second.
526            for op in ops {
527                Absorb::absorb_second(w_inner, op, &*r_handle);
528            }
529        } else {
530            self.oplog.extend(ops);
531        }
532    }
533}
534
535/// `WriteHandle` can be sent across thread boundaries:
536///
537/// ```
538/// use left_right::WriteHandle;
539///
540/// struct Data;
541/// impl left_right::Absorb<()> for Data {
542///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
543///     fn sync_with(&mut self, _: &Self) {}
544/// }
545///
546/// fn is_send<T: Send>() {
547///   // dummy function just used for its parameterized type bound
548/// }
549///
550/// is_send::<WriteHandle<Data, ()>>()
551/// ```
552///
553/// As long as the inner types allow that of course.
554/// Namely, the data type has to be `Send`:
555///
556/// ```compile_fail
557/// use left_right::WriteHandle;
558/// use std::rc::Rc;
559///
560/// struct Data(Rc<()>);
561/// impl left_right::Absorb<()> for Data {
562///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
563/// }
564///
565/// fn is_send<T: Send>() {
566///   // dummy function just used for its parameterized type bound
567/// }
568///
569/// is_send::<WriteHandle<Data, ()>>()
570/// ```
571///
572/// .. the operation type has to be `Send`:
573///
574/// ```compile_fail
575/// use left_right::WriteHandle;
576/// use std::rc::Rc;
577///
578/// struct Data;
579/// impl left_right::Absorb<Rc<()>> for Data {
580///     fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
581/// }
582///
583/// fn is_send<T: Send>() {
584///   // dummy function just used for its parameterized type bound
585/// }
586///
587/// is_send::<WriteHandle<Data, Rc<()>>>()
588/// ```
589///
590/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
591///
592/// ```compile_fail
593/// use left_right::WriteHandle;
594/// use std::cell::Cell;
595///
596/// struct Data(Cell<()>);
597/// impl left_right::Absorb<()> for Data {
598///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
599/// }
600///
601/// fn is_send<T: Send>() {
602///   // dummy function just used for its parameterized type bound
603/// }
604///
605/// is_send::<WriteHandle<Data, ()>>()
606/// ```
607#[allow(dead_code)]
608struct CheckWriteHandleSend;
609
610#[cfg(test)]
611mod tests {
612    use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
613    use crate::{read, Absorb};
614    use slab::Slab;
615    include!("./utilities.rs");
616
617    #[test]
618    fn append_test() {
619        let (mut w, _r) = crate::new::<i32, _>();
620        assert_eq!(w.first, true);
621        w.append(CounterAddOp(1));
622        assert_eq!(w.oplog.len(), 0);
623        assert_eq!(w.first, true);
624        w.publish();
625        assert_eq!(w.first, false);
626        w.append(CounterAddOp(2));
627        w.append(CounterAddOp(3));
628        assert_eq!(w.oplog.len(), 2);
629    }
630
631    #[test]
632    fn take_test() {
633        // publish twice then take with no pending operations
634        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
635        w.append(CounterAddOp(1));
636        w.publish();
637        w.append(CounterAddOp(1));
638        w.publish();
639        assert_eq!(*w.take(), 4);
640
641        // publish twice then pending operation published by take
642        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
643        w.append(CounterAddOp(1));
644        w.publish();
645        w.append(CounterAddOp(1));
646        w.publish();
647        w.append(CounterAddOp(2));
648        assert_eq!(*w.take(), 6);
649
650        // normal publish then pending operations published by take
651        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
652        w.append(CounterAddOp(1));
653        w.publish();
654        w.append(CounterAddOp(1));
655        assert_eq!(*w.take(), 4);
656
657        // pending operations published by take
658        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
659        w.append(CounterAddOp(1));
660        assert_eq!(*w.take(), 3);
661
662        // emptry op queue
663        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
664        w.append(CounterAddOp(1));
665        w.publish();
666        assert_eq!(*w.take(), 3);
667
668        // no operations
669        let (w, _r) = crate::new_from_empty::<i32, _>(2);
670        assert_eq!(*w.take(), 2);
671    }
672
673    #[test]
674    fn wait_test() {
675        use std::sync::{Arc, Barrier};
676        use std::thread;
677        let (mut w, _r) = crate::new::<i32, _>();
678
679        // Case 1: If epoch is set to default.
680        let test_epochs: crate::Epochs = Default::default();
681        let mut test_epochs = test_epochs.lock().unwrap();
682        // since there is no epoch to waiting for, wait function will return immediately.
683        w.wait(&mut test_epochs);
684
685        // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
686        // and wait has been called.
687        let held_epoch = Arc::new(AtomicUsize::new(1));
688
689        w.last_epochs = vec![2, 2, 1];
690        let mut epochs_slab = Slab::new();
691        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
692        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
693        epochs_slab.insert(Arc::clone(&held_epoch));
694
695        let barrier = Arc::new(Barrier::new(2));
696
697        let is_waiting = Arc::clone(&w.is_waiting);
698
699        // check writers waiting state before calling wait.
700        let is_waiting_v = is_waiting.load(Ordering::Relaxed);
701        assert_eq!(false, is_waiting_v);
702
703        let barrier2 = Arc::clone(&barrier);
704        let test_epochs = Arc::new(Mutex::new(epochs_slab));
705        let wait_handle = thread::spawn(move || {
706            barrier2.wait();
707            let mut test_epochs = test_epochs.lock().unwrap();
708            w.wait(&mut test_epochs);
709        });
710
711        barrier.wait();
712
713        // make sure that writer wait() will call first, only then allow to updates the held epoch.
714        while !is_waiting.load(Ordering::Relaxed) {
715            thread::yield_now();
716        }
717
718        held_epoch.fetch_add(1, Ordering::SeqCst);
719
720        // join to make sure that wait must return after the progress/increment
721        // of held_epoch.
722        let _ = wait_handle.join();
723    }
724
725    #[test]
726    fn flush_noblock() {
727        let (mut w, r) = crate::new::<i32, _>();
728        w.append(CounterAddOp(42));
729        w.publish();
730        assert_eq!(*r.enter().unwrap(), 42);
731
732        // pin the epoch
733        let _count = r.enter();
734        // refresh would hang here
735        assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
736        assert!(!w.has_pending_operations());
737    }
738
739    #[test]
740    fn flush_no_refresh() {
741        let (mut w, _) = crate::new::<i32, _>();
742
743        // Until we refresh, writes are written directly instead of going to the
744        // oplog (because there can't be any readers on the w_handle table).
745        assert!(!w.has_pending_operations());
746        w.publish();
747        assert!(!w.has_pending_operations());
748        assert_eq!(w.refreshes, 1);
749
750        w.append(CounterAddOp(42));
751        assert!(w.has_pending_operations());
752        w.publish();
753        assert!(!w.has_pending_operations());
754        assert_eq!(w.refreshes, 2);
755
756        w.append(CounterAddOp(42));
757        assert!(w.has_pending_operations());
758        w.publish();
759        assert!(!w.has_pending_operations());
760        assert_eq!(w.refreshes, 3);
761
762        // Sanity check that a refresh would have been visible
763        assert!(!w.has_pending_operations());
764        w.publish();
765        assert_eq!(w.refreshes, 4);
766    }
767
768    #[test]
769    fn try_publish() {
770        let (mut w, _r) = crate::new::<i32, _>();
771
772        // Case 1: A reader has not advanced (odd and unchanged) -> returns false
773        let mut epochs_slab = Slab::new();
774        let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
775                                                                     // Ensure last_epochs sees this reader as odd and unchanged
776        w.last_epochs = vec![0; epochs_slab.capacity()];
777        w.last_epochs[idx] = 1;
778        w.epochs = Arc::new(Mutex::new(epochs_slab));
779        assert_eq!(w.try_publish(), false);
780
781        // Case 2: All readers have advanced since last swap -> returns true and publishes
782        let mut epochs_slab_ok = Slab::new();
783        let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
784        w.last_epochs = vec![0; epochs_slab_ok.capacity()];
785        w.last_epochs[idx_ok] = 1; // previously odd
786        w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
787        let before = w.refreshes;
788        assert_eq!(w.try_publish(), true);
789        assert_eq!(w.refreshes, before + 1);
790    }
791}