left_right/
write.rs

1use crate::read::ReadHandle;
2use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
3use crate::Absorb;
4use crossbeam_utils::CachePadded;
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9use std::{fmt, thread};
10
11#[cfg(test)]
12use std::sync::atomic::AtomicBool;
13
14/// A writer handle to a left-right guarded data structure.
15///
16/// All operations on the underlying data should be enqueued as operations of type `O` using
17/// [`append`](Self::append). The effect of this operations are only exposed to readers once
18/// [`publish`](Self::publish) is called.
19///
20/// # Reading through a `WriteHandle`
21///
22/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
23/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
24/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
25/// operations prior to the last call to [`publish`](Self::publish).
26pub struct WriteHandle<T, O>
27where
28    T: Absorb<O>,
29{
30    epochs: crate::Epochs,
31    w_handle: NonNull<T>,
32    oplog: VecDeque<O>,
33    swap_index: usize,
34    r_handle: ReadHandle<T>,
35    last_epochs: Vec<usize>,
36    #[cfg(test)]
37    refreshes: usize,
38    #[cfg(test)]
39    is_waiting: Arc<AtomicBool>,
40    /// Write directly to the write handle map, since no publish has happened.
41    first: bool,
42    /// A publish has happened, but the two copies have not been synchronized yet.
43    second: bool,
44    /// If we call `Self::take` the drop needs to be different.
45    taken: bool,
46}
47
48// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
49// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
50// `ReadHandle`, we also need to respect its Send requirements.
51unsafe impl<T, O> Send for WriteHandle<T, O>
52where
53    T: Absorb<O>,
54    T: Send,
55    O: Send,
56    ReadHandle<T>: Send,
57{
58}
59
60impl<T, O> fmt::Debug for WriteHandle<T, O>
61where
62    T: Absorb<O> + fmt::Debug,
63    O: fmt::Debug,
64{
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        f.debug_struct("WriteHandle")
67            .field("epochs", &self.epochs)
68            .field("w_handle", &self.w_handle)
69            .field("oplog", &self.oplog)
70            .field("swap_index", &self.swap_index)
71            .field("r_handle", &self.r_handle)
72            .field("first", &self.first)
73            .field("second", &self.second)
74            .finish()
75    }
76}
77
78/// A **smart pointer** to an owned backing data structure. This makes sure that the
79/// data is dropped correctly (using [`Absorb::drop_second`]).
80///
81/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
82pub struct Taken<T: Absorb<O>, O> {
83    inner: Option<Box<T>>,
84    _marker: PhantomData<O>,
85}
86
87impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("Taken")
90            .field(
91                "inner",
92                self.inner
93                    .as_ref()
94                    .expect("inner is only taken in `into_box` which drops self"),
95            )
96            .finish()
97    }
98}
99
100impl<T: Absorb<O>, O> Deref for Taken<T, O> {
101    type Target = T;
102
103    fn deref(&self) -> &Self::Target {
104        self.inner
105            .as_ref()
106            .expect("inner is only taken in `into_box` which drops self")
107    }
108}
109
110impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
111    fn deref_mut(&mut self) -> &mut Self::Target {
112        self.inner
113            .as_mut()
114            .expect("inner is only taken in `into_box` which drops self")
115    }
116}
117
118impl<T: Absorb<O>, O> Taken<T, O> {
119    /// # Safety
120    ///
121    /// You must call [`Absorb::drop_second`] in case just dropping `T` is not safe and sufficient.
122    ///
123    /// If you used the default implementation of [`Absorb::drop_second`] (which just calls
124    /// [`drop`](Drop::drop)) you don't need to call [`Absorb::drop_second`].
125    pub unsafe fn into_box(mut self) -> Box<T> {
126        self.inner
127            .take()
128            .expect("inner is only taken here then self is dropped")
129    }
130}
131
132impl<T: Absorb<O>, O> Drop for Taken<T, O> {
133    fn drop(&mut self) {
134        if let Some(inner) = self.inner.take() {
135            T::drop_second(inner);
136        }
137    }
138}
139
140impl<T, O> WriteHandle<T, O>
141where
142    T: Absorb<O>,
143{
144    /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
145    ///
146    /// Makes sure that all the pending operations are applied and waits till all the read handles
147    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
148    /// returns the other copy as a [`Taken`] smart pointer.
149    fn take_inner(&mut self) -> Option<Taken<T, O>> {
150        use std::ptr;
151        // Can only take inner once.
152        if self.taken {
153            return None;
154        }
155
156        // Disallow taking again.
157        self.taken = true;
158
159        // first, ensure both copies are up to date
160        // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
161        if self.first || !self.oplog.is_empty() {
162            self.publish();
163        }
164        if !self.oplog.is_empty() {
165            self.publish();
166        }
167        assert!(self.oplog.is_empty());
168
169        // next, grab the read handle and set it to NULL
170        let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
171
172        // now, wait for all readers to depart
173        let epochs = Arc::clone(&self.epochs);
174        let mut epochs = epochs.lock().unwrap();
175        self.wait(&mut epochs);
176
177        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
178        fence(Ordering::SeqCst);
179
180        // all readers have now observed the NULL, so we own both handles.
181        // all operations have been applied to both w_handle and r_handle.
182        // give the underlying data structure an opportunity to handle the one copy differently:
183        //
184        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
185        Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
186
187        // next we take the r_handle and return it as a boxed value.
188        //
189        // this is safe, since we know that no readers are using this pointer
190        // anymore (due to the .wait() following swapping the pointer with NULL).
191        //
192        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
193        let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
194
195        Some(Taken {
196            inner: Some(boxed_r_handle),
197            _marker: PhantomData,
198        })
199    }
200}
201
202impl<T, O> Drop for WriteHandle<T, O>
203where
204    T: Absorb<O>,
205{
206    fn drop(&mut self) {
207        if let Some(inner) = self.take_inner() {
208            drop(inner);
209        }
210    }
211}
212
213impl<T, O> WriteHandle<T, O>
214where
215    T: Absorb<O>,
216{
217    pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
218        Self {
219            epochs,
220            // safety: Box<T> is not null and covariant.
221            w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
222            oplog: VecDeque::new(),
223            swap_index: 0,
224            r_handle,
225            last_epochs: Vec::new(),
226            #[cfg(test)]
227            is_waiting: Arc::new(AtomicBool::new(false)),
228            #[cfg(test)]
229            refreshes: 0,
230            first: true,
231            second: true,
232            taken: false,
233        }
234    }
235
236    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<CachePadded<AtomicUsize>>>>) {
237        let mut iter = 0;
238        let mut starti = 0;
239
240        #[cfg(test)]
241        {
242            self.is_waiting.store(true, Ordering::Relaxed);
243        }
244        // we're over-estimating here, but slab doesn't expose its max index
245        self.last_epochs.resize(epochs.capacity(), 0);
246        'retry: loop {
247            // read all and see if all have changed (which is likely)
248            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
249                // if the reader's epoch was even last we read it (which was _after_ the swap),
250                // then they either do not have the pointer, or must have read the pointer strictly
251                // after the swap. in either case, they cannot be using the old pointer value (what
252                // is now w_handle).
253                //
254                // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
255                // which is odd, and std::u{N}::MAX + 1 == 0 is even.
256                //
257                // note also that `ri` _may_ have been re-used since we last read into last_epochs.
258                // this is okay though, as a change still implies that the new reader must have
259                // arrived _after_ we did the atomic swap, and thus must also have seen the new
260                // pointer.
261                if self.last_epochs[ri] % 2 == 0 {
262                    continue;
263                }
264
265                let now = epoch.load(Ordering::Acquire);
266                if now != self.last_epochs[ri] {
267                    // reader must have seen the last swap, since they have done at least one
268                    // operation since we last looked at their epoch, which _must_ mean that they
269                    // are no longer using the old pointer value.
270                } else {
271                    // reader may not have seen swap
272                    // continue from this reader's epoch
273                    starti = ii;
274
275                    if !cfg!(loom) {
276                        // how eagerly should we retry?
277                        if iter != 20 {
278                            iter += 1;
279                        } else {
280                            thread::yield_now();
281                        }
282                    }
283
284                    #[cfg(loom)]
285                    loom::thread::yield_now();
286
287                    continue 'retry;
288                }
289            }
290            break;
291        }
292        #[cfg(test)]
293        {
294            self.is_waiting.store(false, Ordering::Relaxed);
295        }
296    }
297
298    /// Try to publish once without waiting.
299    ///
300    /// This performs a single, non-blocking check of reader epochs. If all current readers have
301    /// advanced since the last swap, it performs a publish and returns `true`. If any reader may
302    /// still be accessing the old copy, it does nothing and returns `false`.
303    ///
304    /// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
305    /// paths where skipping a publish is preferable to blocking; call again later or fall back to
306    /// [`publish`](Self::publish) if you must ensure visibility.
307    ///
308    /// Returns `true` if a publish occurred, `false` otherwise.
309    pub fn try_publish(&mut self) -> bool {
310        let epochs = Arc::clone(&self.epochs);
311        let mut epochs = epochs.lock().unwrap();
312
313        // This wait loop is exactly like the one in wait, except that if we find a reader that
314        // has not observed the latest swap, we return rather than spin-and-retry.
315        self.last_epochs.resize(epochs.capacity(), 0);
316        for (ri, epoch) in epochs.iter() {
317            if self.last_epochs[ri] % 2 == 0 {
318                continue;
319            }
320
321            let now = epoch.load(Ordering::Acquire);
322            if now != self.last_epochs[ri] {
323                continue;
324            } else {
325                return false;
326            }
327        }
328        #[cfg(test)]
329        {
330            self.is_waiting.store(false, Ordering::Relaxed);
331        }
332        self.update_and_swap(&mut epochs);
333
334        true
335    }
336
337    /// Publish all operations append to the log to reads.
338    ///
339    /// This method needs to wait for all readers to move to the "other" copy of the data so that
340    /// it can replay the operational log onto the stale copy the readers used to use. This can
341    /// take some time, especially if readers are executing slow operations, or if there are many
342    /// of them.
343    pub fn publish(&mut self) -> &mut Self {
344        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
345        // flag has been observed to be on for two subsequent iterations (there still may be some
346        // readers present since we did the previous refresh)
347        //
348        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
349        // only block on pre-existing readers, and they are never waiting to push onto epochs
350        // unless they have finished reading.
351        let epochs = Arc::clone(&self.epochs);
352        let mut epochs = epochs.lock().unwrap();
353
354        self.wait(&mut epochs);
355
356        self.update_and_swap(&mut epochs)
357    }
358
359    /// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
360    ///
361    /// This method must only be called when all readers have exited `w_handle` (e.g., after
362    /// `wait`).
363    fn update_and_swap(
364        &mut self,
365        epochs: &mut MutexGuard<'_, slab::Slab<Arc<CachePadded<AtomicUsize>>>>,
366    ) -> &mut Self {
367        if !self.first {
368            // all the readers have left!
369            // safety: we haven't freed the Box, and no readers are accessing the w_handle
370            let w_handle = unsafe { self.w_handle.as_mut() };
371
372            // safety: we will not swap while we hold this reference
373            let r_handle = unsafe {
374                self.r_handle
375                    .inner
376                    .load(Ordering::Acquire)
377                    .as_ref()
378                    .unwrap()
379            };
380
381            if self.second {
382                Absorb::sync_with(w_handle, r_handle);
383                self.second = false
384            }
385
386            // the w_handle copy has not seen any of the writes in the oplog
387            // the r_handle copy has not seen any of the writes following swap_index
388            if self.swap_index != 0 {
389                // we can drain out the operations that only the w_handle copy needs
390                //
391                // NOTE: the if above is because drain(0..0) would remove 0
392                for op in self.oplog.drain(0..self.swap_index) {
393                    T::absorb_second(w_handle, op, r_handle);
394                }
395            }
396            // we cannot give owned operations to absorb_first
397            // since they'll also be needed by the r_handle copy
398            for op in self.oplog.iter_mut() {
399                T::absorb_first(w_handle, op, r_handle);
400            }
401            // the w_handle copy is about to become the r_handle, and can ignore the oplog
402            self.swap_index = self.oplog.len();
403
404        // w_handle (the old r_handle) is now fully up to date!
405        } else {
406            self.first = false
407        }
408
409        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
410        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
411        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
412        // r_handle.
413        //
414        // it's now time for us to swap the copies so that readers see up-to-date results from
415        // w_handle.
416
417        // swap in our w_handle, and get r_handle in return
418        let r_handle = self
419            .r_handle
420            .inner
421            .swap(self.w_handle.as_ptr(), Ordering::Release);
422
423        // NOTE: at this point, there are likely still readers using r_handle.
424        // safety: r_handle was also created from a Box, so it is not null and is covariant.
425        self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
426
427        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
428        fence(Ordering::SeqCst);
429
430        for (ri, epoch) in epochs.iter() {
431            self.last_epochs[ri] = epoch.load(Ordering::Acquire);
432        }
433
434        #[cfg(test)]
435        {
436            self.refreshes += 1;
437        }
438
439        self
440    }
441
442    /// Publish as necessary to ensure that all operations are visible to readers.
443    ///
444    /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
445    /// This method will only do so if there are pending operations.
446    pub fn flush(&mut self) {
447        if self.has_pending_operations() {
448            self.publish();
449        }
450    }
451
452    /// Returns true if there are operations in the operational log that have not yet been exposed
453    /// to readers.
454    pub fn has_pending_operations(&self) -> bool {
455        // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
456        // there are operations that have not yet been applied to the _write_ handle.
457        self.swap_index < self.oplog.len()
458    }
459
460    /// Append the given operation to the operational log.
461    ///
462    /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
463    pub fn append(&mut self, op: O) -> &mut Self {
464        self.extend(std::iter::once(op));
465        self
466    }
467
468    /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
469    ///
470    /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
471    /// readers still present in this copy. This is not normally something you know; even after
472    /// calling `publish`, readers may still be in the write copy for some time. In general, the
473    /// only time you know this is okay is before the first call to `publish` (since no readers
474    /// ever entered the write copy).
475    // TODO: Make this return `Option<&mut T>`,
476    // and only `Some` if there are indeed to readers in the write copy.
477    pub fn raw_write_handle(&mut self) -> NonNull<T> {
478        self.w_handle
479    }
480
481    /// Returns the backing data structure.
482    ///
483    /// Makes sure that all the pending operations are applied and waits till all the read handles
484    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
485    /// returns the other copy as a [`Taken`] smart pointer.
486    pub fn take(mut self) -> Taken<T, O> {
487        // It is always safe to `expect` here because `take_inner` is private
488        // and it is only called here and in the drop impl. Since we have an owned
489        // `self` we know the drop has not yet been called. And every first call of
490        // `take_inner` returns `Some`
491        self.take_inner()
492            .expect("inner is only taken here then self is dropped")
493    }
494}
495
496// allow using write handle for reads
497use std::ops::Deref;
498impl<T, O> Deref for WriteHandle<T, O>
499where
500    T: Absorb<O>,
501{
502    type Target = ReadHandle<T>;
503    fn deref(&self) -> &Self::Target {
504        &self.r_handle
505    }
506}
507
508impl<T, O> Extend<O> for WriteHandle<T, O>
509where
510    T: Absorb<O>,
511{
512    /// Add multiple operations to the operational log.
513    ///
514    /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
515    fn extend<I>(&mut self, ops: I)
516    where
517        I: IntoIterator<Item = O>,
518    {
519        if self.first {
520            // Safety: we know there are no outstanding w_handle readers, since we haven't
521            // refreshed ever before, so we can modify it directly!
522            let mut w_inner = self.raw_write_handle();
523            let w_inner = unsafe { w_inner.as_mut() };
524            let r_handle = self.enter().expect("map has not yet been destroyed");
525            // Because we are operating directly on the map, and nothing is aliased, we do want
526            // to perform drops, so we invoke absorb_second.
527            for op in ops {
528                Absorb::absorb_second(w_inner, op, &*r_handle);
529            }
530        } else {
531            self.oplog.extend(ops);
532        }
533    }
534}
535
536/// `WriteHandle` can be sent across thread boundaries:
537///
538/// ```
539/// use left_right::WriteHandle;
540///
541/// struct Data;
542/// impl left_right::Absorb<()> for Data {
543///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
544///     fn sync_with(&mut self, _: &Self) {}
545/// }
546///
547/// fn is_send<T: Send>() {
548///   // dummy function just used for its parameterized type bound
549/// }
550///
551/// is_send::<WriteHandle<Data, ()>>()
552/// ```
553///
554/// As long as the inner types allow that of course.
555/// Namely, the data type has to be `Send`:
556///
557/// ```compile_fail
558/// use left_right::WriteHandle;
559/// use std::rc::Rc;
560///
561/// struct Data(Rc<()>);
562/// impl left_right::Absorb<()> for Data {
563///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
564/// }
565///
566/// fn is_send<T: Send>() {
567///   // dummy function just used for its parameterized type bound
568/// }
569///
570/// is_send::<WriteHandle<Data, ()>>()
571/// ```
572///
573/// .. the operation type has to be `Send`:
574///
575/// ```compile_fail
576/// use left_right::WriteHandle;
577/// use std::rc::Rc;
578///
579/// struct Data;
580/// impl left_right::Absorb<Rc<()>> for Data {
581///     fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
582/// }
583///
584/// fn is_send<T: Send>() {
585///   // dummy function just used for its parameterized type bound
586/// }
587///
588/// is_send::<WriteHandle<Data, Rc<()>>>()
589/// ```
590///
591/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
592///
593/// ```compile_fail
594/// use left_right::WriteHandle;
595/// use std::cell::Cell;
596///
597/// struct Data(Cell<()>);
598/// impl left_right::Absorb<()> for Data {
599///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
600/// }
601///
602/// fn is_send<T: Send>() {
603///   // dummy function just used for its parameterized type bound
604/// }
605///
606/// is_send::<WriteHandle<Data, ()>>()
607/// ```
608#[allow(dead_code)]
609struct CheckWriteHandleSend;
610
611#[cfg(test)]
612mod tests {
613    use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
614    use crate::{read, Absorb};
615    use slab::Slab;
616    include!("./utilities.rs");
617
618    #[test]
619    fn append_test() {
620        let (mut w, _r) = crate::new::<i32, _>();
621        assert_eq!(w.first, true);
622        w.append(CounterAddOp(1));
623        assert_eq!(w.oplog.len(), 0);
624        assert_eq!(w.first, true);
625        w.publish();
626        assert_eq!(w.first, false);
627        w.append(CounterAddOp(2));
628        w.append(CounterAddOp(3));
629        assert_eq!(w.oplog.len(), 2);
630    }
631
632    #[test]
633    fn take_test() {
634        // publish twice then take with no pending operations
635        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
636        w.append(CounterAddOp(1));
637        w.publish();
638        w.append(CounterAddOp(1));
639        w.publish();
640        assert_eq!(*w.take(), 4);
641
642        // publish twice then pending operation published by take
643        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
644        w.append(CounterAddOp(1));
645        w.publish();
646        w.append(CounterAddOp(1));
647        w.publish();
648        w.append(CounterAddOp(2));
649        assert_eq!(*w.take(), 6);
650
651        // normal publish then pending operations published by take
652        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
653        w.append(CounterAddOp(1));
654        w.publish();
655        w.append(CounterAddOp(1));
656        assert_eq!(*w.take(), 4);
657
658        // pending operations published by take
659        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
660        w.append(CounterAddOp(1));
661        assert_eq!(*w.take(), 3);
662
663        // emptry op queue
664        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
665        w.append(CounterAddOp(1));
666        w.publish();
667        assert_eq!(*w.take(), 3);
668
669        // no operations
670        let (w, _r) = crate::new_from_empty::<i32, _>(2);
671        assert_eq!(*w.take(), 2);
672    }
673
674    #[test]
675    fn wait_test() {
676        use std::sync::{Arc, Barrier};
677        use std::thread;
678        let (mut w, _r) = crate::new::<i32, _>();
679
680        // Case 1: If epoch is set to default.
681        let test_epochs: crate::Epochs = Default::default();
682        let mut test_epochs = test_epochs.lock().unwrap();
683        // since there is no epoch to waiting for, wait function will return immediately.
684        w.wait(&mut test_epochs);
685
686        // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
687        // and wait has been called.
688        let held_epoch = Arc::new(AtomicUsize::new(1));
689
690        w.last_epochs = vec![2, 2, 1];
691        let mut epochs_slab = Slab::new();
692        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
693        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
694        epochs_slab.insert(Arc::clone(&held_epoch));
695
696        let barrier = Arc::new(Barrier::new(2));
697
698        let is_waiting = Arc::clone(&w.is_waiting);
699
700        // check writers waiting state before calling wait.
701        let is_waiting_v = is_waiting.load(Ordering::Relaxed);
702        assert_eq!(false, is_waiting_v);
703
704        let barrier2 = Arc::clone(&barrier);
705        let test_epochs = Arc::new(Mutex::new(epochs_slab));
706        let wait_handle = thread::spawn(move || {
707            barrier2.wait();
708            let mut test_epochs = test_epochs.lock().unwrap();
709            w.wait(&mut test_epochs);
710        });
711
712        barrier.wait();
713
714        // make sure that writer wait() will call first, only then allow to updates the held epoch.
715        while !is_waiting.load(Ordering::Relaxed) {
716            thread::yield_now();
717        }
718
719        held_epoch.fetch_add(1, Ordering::SeqCst);
720
721        // join to make sure that wait must return after the progress/increment
722        // of held_epoch.
723        let _ = wait_handle.join();
724    }
725
726    #[test]
727    fn flush_noblock() {
728        let (mut w, r) = crate::new::<i32, _>();
729        w.append(CounterAddOp(42));
730        w.publish();
731        assert_eq!(*r.enter().unwrap(), 42);
732
733        // pin the epoch
734        let _count = r.enter();
735        // refresh would hang here
736        assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
737        assert!(!w.has_pending_operations());
738    }
739
740    #[test]
741    fn flush_no_refresh() {
742        let (mut w, _) = crate::new::<i32, _>();
743
744        // Until we refresh, writes are written directly instead of going to the
745        // oplog (because there can't be any readers on the w_handle table).
746        assert!(!w.has_pending_operations());
747        w.publish();
748        assert!(!w.has_pending_operations());
749        assert_eq!(w.refreshes, 1);
750
751        w.append(CounterAddOp(42));
752        assert!(w.has_pending_operations());
753        w.publish();
754        assert!(!w.has_pending_operations());
755        assert_eq!(w.refreshes, 2);
756
757        w.append(CounterAddOp(42));
758        assert!(w.has_pending_operations());
759        w.publish();
760        assert!(!w.has_pending_operations());
761        assert_eq!(w.refreshes, 3);
762
763        // Sanity check that a refresh would have been visible
764        assert!(!w.has_pending_operations());
765        w.publish();
766        assert_eq!(w.refreshes, 4);
767    }
768
769    #[test]
770    fn try_publish() {
771        let (mut w, _r) = crate::new::<i32, _>();
772
773        // Case 1: A reader has not advanced (odd and unchanged) -> returns false
774        let mut epochs_slab = Slab::new();
775        let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
776                                                                     // Ensure last_epochs sees this reader as odd and unchanged
777        w.last_epochs = vec![0; epochs_slab.capacity()];
778        w.last_epochs[idx] = 1;
779        w.epochs = Arc::new(Mutex::new(epochs_slab));
780        assert_eq!(w.try_publish(), false);
781
782        // Case 2: All readers have advanced since last swap -> returns true and publishes
783        let mut epochs_slab_ok = Slab::new();
784        let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
785        w.last_epochs = vec![0; epochs_slab_ok.capacity()];
786        w.last_epochs[idx_ok] = 1; // previously odd
787        w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
788        let before = w.refreshes;
789        assert_eq!(w.try_publish(), true);
790        assert_eq!(w.refreshes, before + 1);
791    }
792}