left_right/
write.rs

1use crate::read::ReadHandle;
2use crate::Absorb;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9#[cfg(test)]
10use std::sync::atomic::AtomicBool;
11use std::{fmt, thread};
12
13/// A writer handle to a left-right guarded data structure.
14///
15/// All operations on the underlying data should be enqueued as operations of type `O` using
16/// [`append`](Self::append). The effect of this operations are only exposed to readers once
17/// [`publish`](Self::publish) is called.
18///
19/// # Reading through a `WriteHandle`
20///
21/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
22/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
23/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
24/// operations prior to the last call to [`publish`](Self::publish).
25pub struct WriteHandle<T, O>
26where
27    T: Absorb<O>,
28{
29    epochs: crate::Epochs,
30    w_handle: NonNull<T>,
31    oplog: VecDeque<O>,
32    swap_index: usize,
33    r_handle: ReadHandle<T>,
34    last_epochs: Vec<usize>,
35    #[cfg(test)]
36    refreshes: usize,
37    #[cfg(test)]
38    is_waiting: Arc<AtomicBool>,
39    /// Write directly to the write handle map, since no publish has happened.
40    first: bool,
41    /// A publish has happened, but the two copies have not been synchronized yet.
42    second: bool,
43    /// If we call `Self::take` the drop needs to be different.
44    taken: bool,
45}
46
47// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
48// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
49// `ReadHandle`, we also need to respect its Send requirements.
50unsafe impl<T, O> Send for WriteHandle<T, O>
51where
52    T: Absorb<O>,
53    T: Send,
54    O: Send,
55    ReadHandle<T>: Send,
56{
57}
58
59impl<T, O> fmt::Debug for WriteHandle<T, O>
60where
61    T: Absorb<O> + fmt::Debug,
62    O: fmt::Debug,
63{
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        f.debug_struct("WriteHandle")
66            .field("epochs", &self.epochs)
67            .field("w_handle", &self.w_handle)
68            .field("oplog", &self.oplog)
69            .field("swap_index", &self.swap_index)
70            .field("r_handle", &self.r_handle)
71            .field("first", &self.first)
72            .field("second", &self.second)
73            .finish()
74    }
75}
76
77/// A **smart pointer** to an owned backing data structure. This makes sure that the
78/// data is dropped correctly (using [`Absorb::drop_second`]).
79///
80/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
81pub struct Taken<T: Absorb<O>, O> {
82    inner: Option<Box<T>>,
83    _marker: PhantomData<O>,
84}
85
86impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("Taken")
89            .field(
90                "inner",
91                self.inner
92                    .as_ref()
93                    .expect("inner is only taken in `into_box` which drops self"),
94            )
95            .finish()
96    }
97}
98
99impl<T: Absorb<O>, O> Deref for Taken<T, O> {
100    type Target = T;
101
102    fn deref(&self) -> &Self::Target {
103        self.inner
104            .as_ref()
105            .expect("inner is only taken in `into_box` which drops self")
106    }
107}
108
109impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
110    fn deref_mut(&mut self) -> &mut Self::Target {
111        self.inner
112            .as_mut()
113            .expect("inner is only taken in `into_box` which drops self")
114    }
115}
116
117impl<T: Absorb<O>, O> Taken<T, O> {
118    /// This is unsafe because you must call [`Absorb::drop_second`] in
119    /// case just dropping `T` is not safe and sufficient.
120    ///
121    /// If you used the default implementation of [`Absorb::drop_second`] (which just calls [`drop`](Drop::drop))
122    /// you don't need to call [`Absorb::drop_second`].
123    pub unsafe fn into_box(mut self) -> Box<T> {
124        self.inner
125            .take()
126            .expect("inner is only taken here then self is dropped")
127    }
128}
129
130impl<T: Absorb<O>, O> Drop for Taken<T, O> {
131    fn drop(&mut self) {
132        if let Some(inner) = self.inner.take() {
133            T::drop_second(inner);
134        }
135    }
136}
137
138impl<T, O> WriteHandle<T, O>
139where
140    T: Absorb<O>,
141{
142    /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
143    ///
144    /// Makes sure that all the pending operations are applied and waits till all the read handles
145    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
146    /// returns the other copy as a [`Taken`] smart pointer.
147    fn take_inner(&mut self) -> Option<Taken<T, O>> {
148        use std::ptr;
149        // Can only take inner once.
150        if self.taken {
151            return None;
152        }
153
154        // Disallow taking again.
155        self.taken = true;
156
157        // first, ensure both copies are up to date
158        // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
159        if self.first || !self.oplog.is_empty() {
160            self.publish();
161        }
162        if !self.oplog.is_empty() {
163            self.publish();
164        }
165        assert!(self.oplog.is_empty());
166
167        // next, grab the read handle and set it to NULL
168        let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
169
170        // now, wait for all readers to depart
171        let epochs = Arc::clone(&self.epochs);
172        let mut epochs = epochs.lock().unwrap();
173        self.wait(&mut epochs);
174
175        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
176        fence(Ordering::SeqCst);
177
178        // all readers have now observed the NULL, so we own both handles.
179        // all operations have been applied to both w_handle and r_handle.
180        // give the underlying data structure an opportunity to handle the one copy differently:
181        //
182        // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
183        Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
184
185        // next we take the r_handle and return it as a boxed value.
186        //
187        // this is safe, since we know that no readers are using this pointer
188        // anymore (due to the .wait() following swapping the pointer with NULL).
189        //
190        // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
191        let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
192
193        Some(Taken {
194            inner: Some(boxed_r_handle),
195            _marker: PhantomData,
196        })
197    }
198}
199
200impl<T, O> Drop for WriteHandle<T, O>
201where
202    T: Absorb<O>,
203{
204    fn drop(&mut self) {
205        if let Some(inner) = self.take_inner() {
206            drop(inner);
207        }
208    }
209}
210
211impl<T, O> WriteHandle<T, O>
212where
213    T: Absorb<O>,
214{
215    pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
216        Self {
217            epochs,
218            // safety: Box<T> is not null and covariant.
219            w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
220            oplog: VecDeque::new(),
221            swap_index: 0,
222            r_handle,
223            last_epochs: Vec::new(),
224            #[cfg(test)]
225            is_waiting: Arc::new(AtomicBool::new(false)),
226            #[cfg(test)]
227            refreshes: 0,
228            first: true,
229            second: true,
230            taken: false,
231        }
232    }
233
234    fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
235        let mut iter = 0;
236        let mut starti = 0;
237
238        #[cfg(test)]
239        {
240            self.is_waiting.store(true, Ordering::Relaxed);
241        }
242        // we're over-estimating here, but slab doesn't expose its max index
243        self.last_epochs.resize(epochs.capacity(), 0);
244        'retry: loop {
245            // read all and see if all have changed (which is likely)
246            for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
247                // if the reader's epoch was even last we read it (which was _after_ the swap),
248                // then they either do not have the pointer, or must have read the pointer strictly
249                // after the swap. in either case, they cannot be using the old pointer value (what
250                // is now w_handle).
251                //
252                // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
253                // which is odd, and std::u{N}::MAX + 1 == 0 is even.
254                //
255                // note also that `ri` _may_ have been re-used since we last read into last_epochs.
256                // this is okay though, as a change still implies that the new reader must have
257                // arrived _after_ we did the atomic swap, and thus must also have seen the new
258                // pointer.
259                if self.last_epochs[ri] % 2 == 0 {
260                    continue;
261                }
262
263                let now = epoch.load(Ordering::Acquire);
264                if now != self.last_epochs[ri] {
265                    // reader must have seen the last swap, since they have done at least one
266                    // operation since we last looked at their epoch, which _must_ mean that they
267                    // are no longer using the old pointer value.
268                } else {
269                    // reader may not have seen swap
270                    // continue from this reader's epoch
271                    starti = ii;
272
273                    if !cfg!(loom) {
274                        // how eagerly should we retry?
275                        if iter != 20 {
276                            iter += 1;
277                        } else {
278                            thread::yield_now();
279                        }
280                    }
281
282                    #[cfg(loom)]
283                    loom::thread::yield_now();
284
285                    continue 'retry;
286                }
287            }
288            break;
289        }
290        #[cfg(test)]
291        {
292            self.is_waiting.store(false, Ordering::Relaxed);
293        }
294    }
295
296    /// Publish all operations append to the log to reads.
297    ///
298    /// This method needs to wait for all readers to move to the "other" copy of the data so that
299    /// it can replay the operational log onto the stale copy the readers used to use. This can
300    /// take some time, especially if readers are executing slow operations, or if there are many
301    /// of them.
302    pub fn publish(&mut self) -> &mut Self {
303        // we need to wait until all epochs have changed since the swaps *or* until a "finished"
304        // flag has been observed to be on for two subsequent iterations (there still may be some
305        // readers present since we did the previous refresh)
306        //
307        // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
308        // only block on pre-existing readers, and they are never waiting to push onto epochs
309        // unless they have finished reading.
310        let epochs = Arc::clone(&self.epochs);
311        let mut epochs = epochs.lock().unwrap();
312
313        self.wait(&mut epochs);
314
315        if !self.first {
316            // all the readers have left!
317            // safety: we haven't freed the Box, and no readers are accessing the w_handle
318            let w_handle = unsafe { self.w_handle.as_mut() };
319
320            // safety: we will not swap while we hold this reference
321            let r_handle = unsafe {
322                self.r_handle
323                    .inner
324                    .load(Ordering::Acquire)
325                    .as_ref()
326                    .unwrap()
327            };
328
329            if self.second {
330                Absorb::sync_with(w_handle, r_handle);
331                self.second = false
332            }
333
334            // the w_handle copy has not seen any of the writes in the oplog
335            // the r_handle copy has not seen any of the writes following swap_index
336            if self.swap_index != 0 {
337                // we can drain out the operations that only the w_handle copy needs
338                //
339                // NOTE: the if above is because drain(0..0) would remove 0
340                for op in self.oplog.drain(0..self.swap_index) {
341                    T::absorb_second(w_handle, op, r_handle);
342                }
343            }
344            // we cannot give owned operations to absorb_first
345            // since they'll also be needed by the r_handle copy
346            for op in self.oplog.iter_mut() {
347                T::absorb_first(w_handle, op, r_handle);
348            }
349            // the w_handle copy is about to become the r_handle, and can ignore the oplog
350            self.swap_index = self.oplog.len();
351
352        // w_handle (the old r_handle) is now fully up to date!
353        } else {
354            self.first = false
355        }
356
357        // at this point, we have exclusive access to w_handle, and it is up-to-date with all
358        // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
359        // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
360        // r_handle.
361        //
362        // it's now time for us to swap the copies so that readers see up-to-date results from
363        // w_handle.
364
365        // swap in our w_handle, and get r_handle in return
366        let r_handle = self
367            .r_handle
368            .inner
369            .swap(self.w_handle.as_ptr(), Ordering::Release);
370
371        // NOTE: at this point, there are likely still readers using r_handle.
372        // safety: r_handle was also created from a Box, so it is not null and is covariant.
373        self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
374
375        // ensure that the subsequent epoch reads aren't re-ordered to before the swap
376        fence(Ordering::SeqCst);
377
378        for (ri, epoch) in epochs.iter() {
379            self.last_epochs[ri] = epoch.load(Ordering::Acquire);
380        }
381
382        #[cfg(test)]
383        {
384            self.refreshes += 1;
385        }
386
387        self
388    }
389
390    /// Publish as necessary to ensure that all operations are visible to readers.
391    ///
392    /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
393    /// This method will only do so if there are pending operations.
394    pub fn flush(&mut self) {
395        if self.has_pending_operations() {
396            self.publish();
397        }
398    }
399
400    /// Returns true if there are operations in the operational log that have not yet been exposed
401    /// to readers.
402    pub fn has_pending_operations(&self) -> bool {
403        // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
404        // there are operations that have not yet been applied to the _write_ handle.
405        self.swap_index < self.oplog.len()
406    }
407
408    /// Append the given operation to the operational log.
409    ///
410    /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
411    pub fn append(&mut self, op: O) -> &mut Self {
412        self.extend(std::iter::once(op));
413        self
414    }
415
416    /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
417    ///
418    /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
419    /// readers still present in this copy. This is not normally something you know; even after
420    /// calling `publish`, readers may still be in the write copy for some time. In general, the
421    /// only time you know this is okay is before the first call to `publish` (since no readers
422    /// ever entered the write copy).
423    // TODO: Make this return `Option<&mut T>`,
424    // and only `Some` if there are indeed to readers in the write copy.
425    pub fn raw_write_handle(&mut self) -> NonNull<T> {
426        self.w_handle
427    }
428
429    /// Returns the backing data structure.
430    ///
431    /// Makes sure that all the pending operations are applied and waits till all the read handles
432    /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
433    /// returns the other copy as a [`Taken`] smart pointer.
434    pub fn take(mut self) -> Taken<T, O> {
435        // It is always safe to `expect` here because `take_inner` is private
436        // and it is only called here and in the drop impl. Since we have an owned
437        // `self` we know the drop has not yet been called. And every first call of
438        // `take_inner` returns `Some`
439        self.take_inner()
440            .expect("inner is only taken here then self is dropped")
441    }
442}
443
444// allow using write handle for reads
445use std::ops::Deref;
446impl<T, O> Deref for WriteHandle<T, O>
447where
448    T: Absorb<O>,
449{
450    type Target = ReadHandle<T>;
451    fn deref(&self) -> &Self::Target {
452        &self.r_handle
453    }
454}
455
456impl<T, O> Extend<O> for WriteHandle<T, O>
457where
458    T: Absorb<O>,
459{
460    /// Add multiple operations to the operational log.
461    ///
462    /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
463    fn extend<I>(&mut self, ops: I)
464    where
465        I: IntoIterator<Item = O>,
466    {
467        if self.first {
468            // Safety: we know there are no outstanding w_handle readers, since we haven't
469            // refreshed ever before, so we can modify it directly!
470            let mut w_inner = self.raw_write_handle();
471            let w_inner = unsafe { w_inner.as_mut() };
472            let r_handle = self.enter().expect("map has not yet been destroyed");
473            // Because we are operating directly on the map, and nothing is aliased, we do want
474            // to perform drops, so we invoke absorb_second.
475            for op in ops {
476                Absorb::absorb_second(w_inner, op, &*r_handle);
477            }
478        } else {
479            self.oplog.extend(ops);
480        }
481    }
482}
483
484/// `WriteHandle` can be sent across thread boundaries:
485///
486/// ```
487/// use left_right::WriteHandle;
488///
489/// struct Data;
490/// impl left_right::Absorb<()> for Data {
491///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
492///     fn sync_with(&mut self, _: &Self) {}
493/// }
494///
495/// fn is_send<T: Send>() {
496///   // dummy function just used for its parameterized type bound
497/// }
498///
499/// is_send::<WriteHandle<Data, ()>>()
500/// ```
501///
502/// As long as the inner types allow that of course.
503/// Namely, the data type has to be `Send`:
504///
505/// ```compile_fail
506/// use left_right::WriteHandle;
507/// use std::rc::Rc;
508///
509/// struct Data(Rc<()>);
510/// impl left_right::Absorb<()> for Data {
511///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
512/// }
513///
514/// fn is_send<T: Send>() {
515///   // dummy function just used for its parameterized type bound
516/// }
517///
518/// is_send::<WriteHandle<Data, ()>>()
519/// ```
520///
521/// .. the operation type has to be `Send`:
522///
523/// ```compile_fail
524/// use left_right::WriteHandle;
525/// use std::rc::Rc;
526///
527/// struct Data;
528/// impl left_right::Absorb<Rc<()>> for Data {
529///     fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
530/// }
531///
532/// fn is_send<T: Send>() {
533///   // dummy function just used for its parameterized type bound
534/// }
535///
536/// is_send::<WriteHandle<Data, Rc<()>>>()
537/// ```
538///
539/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
540///
541/// ```compile_fail
542/// use left_right::WriteHandle;
543/// use std::cell::Cell;
544///
545/// struct Data(Cell<()>);
546/// impl left_right::Absorb<()> for Data {
547///     fn absorb_first(&mut self, _: &mut (), _: &Self) {}
548/// }
549///
550/// fn is_send<T: Send>() {
551///   // dummy function just used for its parameterized type bound
552/// }
553///
554/// is_send::<WriteHandle<Data, ()>>()
555/// ```
556#[allow(dead_code)]
557struct CheckWriteHandleSend;
558
559#[cfg(test)]
560mod tests {
561    use crate::sync::{AtomicUsize, Mutex, Ordering};
562    use crate::Absorb;
563    use slab::Slab;
564    include!("./utilities.rs");
565
566    #[test]
567    fn append_test() {
568        let (mut w, _r) = crate::new::<i32, _>();
569        assert_eq!(w.first, true);
570        w.append(CounterAddOp(1));
571        assert_eq!(w.oplog.len(), 0);
572        assert_eq!(w.first, true);
573        w.publish();
574        assert_eq!(w.first, false);
575        w.append(CounterAddOp(2));
576        w.append(CounterAddOp(3));
577        assert_eq!(w.oplog.len(), 2);
578    }
579
580    #[test]
581    fn take_test() {
582        // publish twice then take with no pending operations
583        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
584        w.append(CounterAddOp(1));
585        w.publish();
586        w.append(CounterAddOp(1));
587        w.publish();
588        assert_eq!(*w.take(), 4);
589
590        // publish twice then pending operation published by take
591        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
592        w.append(CounterAddOp(1));
593        w.publish();
594        w.append(CounterAddOp(1));
595        w.publish();
596        w.append(CounterAddOp(2));
597        assert_eq!(*w.take(), 6);
598
599        // normal publish then pending operations published by take
600        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
601        w.append(CounterAddOp(1));
602        w.publish();
603        w.append(CounterAddOp(1));
604        assert_eq!(*w.take(), 4);
605
606        // pending operations published by take
607        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
608        w.append(CounterAddOp(1));
609        assert_eq!(*w.take(), 3);
610
611        // emptry op queue
612        let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
613        w.append(CounterAddOp(1));
614        w.publish();
615        assert_eq!(*w.take(), 3);
616
617        // no operations
618        let (w, _r) = crate::new_from_empty::<i32, _>(2);
619        assert_eq!(*w.take(), 2);
620    }
621
622    #[test]
623    fn wait_test() {
624        use std::sync::{Arc, Barrier};
625        use std::thread;
626        let (mut w, _r) = crate::new::<i32, _>();
627
628        // Case 1: If epoch is set to default.
629        let test_epochs: crate::Epochs = Default::default();
630        let mut test_epochs = test_epochs.lock().unwrap();
631        // since there is no epoch to waiting for, wait function will return immediately.
632        w.wait(&mut test_epochs);
633
634        // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
635        // and wait has been called.
636        let held_epoch = Arc::new(AtomicUsize::new(1));
637
638        w.last_epochs = vec![2, 2, 1];
639        let mut epochs_slab = Slab::new();
640        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
641        epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
642        epochs_slab.insert(Arc::clone(&held_epoch));
643
644        let barrier = Arc::new(Barrier::new(2));
645
646        let is_waiting = Arc::clone(&w.is_waiting);
647
648        // check writers waiting state before calling wait.
649        let is_waiting_v = is_waiting.load(Ordering::Relaxed);
650        assert_eq!(false, is_waiting_v);
651
652        let barrier2 = Arc::clone(&barrier);
653        let test_epochs = Arc::new(Mutex::new(epochs_slab));
654        let wait_handle = thread::spawn(move || {
655            barrier2.wait();
656            let mut test_epochs = test_epochs.lock().unwrap();
657            w.wait(&mut test_epochs);
658        });
659
660        barrier.wait();
661
662        // make sure that writer wait() will call first, only then allow to updates the held epoch.
663        while !is_waiting.load(Ordering::Relaxed) {
664            thread::yield_now();
665        }
666
667        held_epoch.fetch_add(1, Ordering::SeqCst);
668
669        // join to make sure that wait must return after the progress/increment
670        // of held_epoch.
671        let _ = wait_handle.join();
672    }
673
674    #[test]
675    fn flush_noblock() {
676        let (mut w, r) = crate::new::<i32, _>();
677        w.append(CounterAddOp(42));
678        w.publish();
679        assert_eq!(*r.enter().unwrap(), 42);
680
681        // pin the epoch
682        let _count = r.enter();
683        // refresh would hang here
684        assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
685        assert!(!w.has_pending_operations());
686    }
687
688    #[test]
689    fn flush_no_refresh() {
690        let (mut w, _) = crate::new::<i32, _>();
691
692        // Until we refresh, writes are written directly instead of going to the
693        // oplog (because there can't be any readers on the w_handle table).
694        assert!(!w.has_pending_operations());
695        w.publish();
696        assert!(!w.has_pending_operations());
697        assert_eq!(w.refreshes, 1);
698
699        w.append(CounterAddOp(42));
700        assert!(w.has_pending_operations());
701        w.publish();
702        assert!(!w.has_pending_operations());
703        assert_eq!(w.refreshes, 2);
704
705        w.append(CounterAddOp(42));
706        assert!(w.has_pending_operations());
707        w.publish();
708        assert!(!w.has_pending_operations());
709        assert_eq!(w.refreshes, 3);
710
711        // Sanity check that a refresh would have been visible
712        assert!(!w.has_pending_operations());
713        w.publish();
714        assert_eq!(w.refreshes, 4);
715    }
716}