left_right/write.rs
1use crate::read::ReadHandle;
2use crate::Absorb;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9#[cfg(test)]
10use std::sync::atomic::AtomicBool;
11use std::{fmt, thread};
12
13/// A writer handle to a left-right guarded data structure.
14///
15/// All operations on the underlying data should be enqueued as operations of type `O` using
16/// [`append`](Self::append). The effect of this operations are only exposed to readers once
17/// [`publish`](Self::publish) is called.
18///
19/// # Reading through a `WriteHandle`
20///
21/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
22/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
23/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
24/// operations prior to the last call to [`publish`](Self::publish).
25pub struct WriteHandle<T, O>
26where
27 T: Absorb<O>,
28{
29 epochs: crate::Epochs,
30 w_handle: NonNull<T>,
31 oplog: VecDeque<O>,
32 swap_index: usize,
33 r_handle: ReadHandle<T>,
34 last_epochs: Vec<usize>,
35 #[cfg(test)]
36 refreshes: usize,
37 #[cfg(test)]
38 is_waiting: Arc<AtomicBool>,
39 /// Write directly to the write handle map, since no publish has happened.
40 first: bool,
41 /// A publish has happened, but the two copies have not been synchronized yet.
42 second: bool,
43 /// If we call `Self::take` the drop needs to be different.
44 taken: bool,
45}
46
47// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
48// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
49// `ReadHandle`, we also need to respect its Send requirements.
50unsafe impl<T, O> Send for WriteHandle<T, O>
51where
52 T: Absorb<O>,
53 T: Send,
54 O: Send,
55 ReadHandle<T>: Send,
56{
57}
58
59impl<T, O> fmt::Debug for WriteHandle<T, O>
60where
61 T: Absorb<O> + fmt::Debug,
62 O: fmt::Debug,
63{
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("WriteHandle")
66 .field("epochs", &self.epochs)
67 .field("w_handle", &self.w_handle)
68 .field("oplog", &self.oplog)
69 .field("swap_index", &self.swap_index)
70 .field("r_handle", &self.r_handle)
71 .field("first", &self.first)
72 .field("second", &self.second)
73 .finish()
74 }
75}
76
77/// A **smart pointer** to an owned backing data structure. This makes sure that the
78/// data is dropped correctly (using [`Absorb::drop_second`]).
79///
80/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
81pub struct Taken<T: Absorb<O>, O> {
82 inner: Option<Box<T>>,
83 _marker: PhantomData<O>,
84}
85
86impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("Taken")
89 .field(
90 "inner",
91 self.inner
92 .as_ref()
93 .expect("inner is only taken in `into_box` which drops self"),
94 )
95 .finish()
96 }
97}
98
99impl<T: Absorb<O>, O> Deref for Taken<T, O> {
100 type Target = T;
101
102 fn deref(&self) -> &Self::Target {
103 self.inner
104 .as_ref()
105 .expect("inner is only taken in `into_box` which drops self")
106 }
107}
108
109impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
110 fn deref_mut(&mut self) -> &mut Self::Target {
111 self.inner
112 .as_mut()
113 .expect("inner is only taken in `into_box` which drops self")
114 }
115}
116
117impl<T: Absorb<O>, O> Taken<T, O> {
118 /// This is unsafe because you must call [`Absorb::drop_second`] in
119 /// case just dropping `T` is not safe and sufficient.
120 ///
121 /// If you used the default implementation of [`Absorb::drop_second`] (which just calls [`drop`](Drop::drop))
122 /// you don't need to call [`Absorb::drop_second`].
123 pub unsafe fn into_box(mut self) -> Box<T> {
124 self.inner
125 .take()
126 .expect("inner is only taken here then self is dropped")
127 }
128}
129
130impl<T: Absorb<O>, O> Drop for Taken<T, O> {
131 fn drop(&mut self) {
132 if let Some(inner) = self.inner.take() {
133 T::drop_second(inner);
134 }
135 }
136}
137
138impl<T, O> WriteHandle<T, O>
139where
140 T: Absorb<O>,
141{
142 /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
143 ///
144 /// Makes sure that all the pending operations are applied and waits till all the read handles
145 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
146 /// returns the other copy as a [`Taken`] smart pointer.
147 fn take_inner(&mut self) -> Option<Taken<T, O>> {
148 use std::ptr;
149 // Can only take inner once.
150 if self.taken {
151 return None;
152 }
153
154 // Disallow taking again.
155 self.taken = true;
156
157 // first, ensure both copies are up to date
158 // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
159 if self.first || !self.oplog.is_empty() {
160 self.publish();
161 }
162 if !self.oplog.is_empty() {
163 self.publish();
164 }
165 assert!(self.oplog.is_empty());
166
167 // next, grab the read handle and set it to NULL
168 let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
169
170 // now, wait for all readers to depart
171 let epochs = Arc::clone(&self.epochs);
172 let mut epochs = epochs.lock().unwrap();
173 self.wait(&mut epochs);
174
175 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
176 fence(Ordering::SeqCst);
177
178 // all readers have now observed the NULL, so we own both handles.
179 // all operations have been applied to both w_handle and r_handle.
180 // give the underlying data structure an opportunity to handle the one copy differently:
181 //
182 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
183 Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
184
185 // next we take the r_handle and return it as a boxed value.
186 //
187 // this is safe, since we know that no readers are using this pointer
188 // anymore (due to the .wait() following swapping the pointer with NULL).
189 //
190 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
191 let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
192
193 Some(Taken {
194 inner: Some(boxed_r_handle),
195 _marker: PhantomData,
196 })
197 }
198}
199
200impl<T, O> Drop for WriteHandle<T, O>
201where
202 T: Absorb<O>,
203{
204 fn drop(&mut self) {
205 if let Some(inner) = self.take_inner() {
206 drop(inner);
207 }
208 }
209}
210
211impl<T, O> WriteHandle<T, O>
212where
213 T: Absorb<O>,
214{
215 pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
216 Self {
217 epochs,
218 // safety: Box<T> is not null and covariant.
219 w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
220 oplog: VecDeque::new(),
221 swap_index: 0,
222 r_handle,
223 last_epochs: Vec::new(),
224 #[cfg(test)]
225 is_waiting: Arc::new(AtomicBool::new(false)),
226 #[cfg(test)]
227 refreshes: 0,
228 first: true,
229 second: true,
230 taken: false,
231 }
232 }
233
234 fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
235 let mut iter = 0;
236 let mut starti = 0;
237
238 #[cfg(test)]
239 {
240 self.is_waiting.store(true, Ordering::Relaxed);
241 }
242 // we're over-estimating here, but slab doesn't expose its max index
243 self.last_epochs.resize(epochs.capacity(), 0);
244 'retry: loop {
245 // read all and see if all have changed (which is likely)
246 for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
247 // if the reader's epoch was even last we read it (which was _after_ the swap),
248 // then they either do not have the pointer, or must have read the pointer strictly
249 // after the swap. in either case, they cannot be using the old pointer value (what
250 // is now w_handle).
251 //
252 // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
253 // which is odd, and std::u{N}::MAX + 1 == 0 is even.
254 //
255 // note also that `ri` _may_ have been re-used since we last read into last_epochs.
256 // this is okay though, as a change still implies that the new reader must have
257 // arrived _after_ we did the atomic swap, and thus must also have seen the new
258 // pointer.
259 if self.last_epochs[ri] % 2 == 0 {
260 continue;
261 }
262
263 let now = epoch.load(Ordering::Acquire);
264 if now != self.last_epochs[ri] {
265 // reader must have seen the last swap, since they have done at least one
266 // operation since we last looked at their epoch, which _must_ mean that they
267 // are no longer using the old pointer value.
268 } else {
269 // reader may not have seen swap
270 // continue from this reader's epoch
271 starti = ii;
272
273 if !cfg!(loom) {
274 // how eagerly should we retry?
275 if iter != 20 {
276 iter += 1;
277 } else {
278 thread::yield_now();
279 }
280 }
281
282 #[cfg(loom)]
283 loom::thread::yield_now();
284
285 continue 'retry;
286 }
287 }
288 break;
289 }
290 #[cfg(test)]
291 {
292 self.is_waiting.store(false, Ordering::Relaxed);
293 }
294 }
295
296 /// Publish all operations append to the log to reads.
297 ///
298 /// This method needs to wait for all readers to move to the "other" copy of the data so that
299 /// it can replay the operational log onto the stale copy the readers used to use. This can
300 /// take some time, especially if readers are executing slow operations, or if there are many
301 /// of them.
302 pub fn publish(&mut self) -> &mut Self {
303 // we need to wait until all epochs have changed since the swaps *or* until a "finished"
304 // flag has been observed to be on for two subsequent iterations (there still may be some
305 // readers present since we did the previous refresh)
306 //
307 // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
308 // only block on pre-existing readers, and they are never waiting to push onto epochs
309 // unless they have finished reading.
310 let epochs = Arc::clone(&self.epochs);
311 let mut epochs = epochs.lock().unwrap();
312
313 self.wait(&mut epochs);
314
315 if !self.first {
316 // all the readers have left!
317 // safety: we haven't freed the Box, and no readers are accessing the w_handle
318 let w_handle = unsafe { self.w_handle.as_mut() };
319
320 // safety: we will not swap while we hold this reference
321 let r_handle = unsafe {
322 self.r_handle
323 .inner
324 .load(Ordering::Acquire)
325 .as_ref()
326 .unwrap()
327 };
328
329 if self.second {
330 Absorb::sync_with(w_handle, r_handle);
331 self.second = false
332 }
333
334 // the w_handle copy has not seen any of the writes in the oplog
335 // the r_handle copy has not seen any of the writes following swap_index
336 if self.swap_index != 0 {
337 // we can drain out the operations that only the w_handle copy needs
338 //
339 // NOTE: the if above is because drain(0..0) would remove 0
340 for op in self.oplog.drain(0..self.swap_index) {
341 T::absorb_second(w_handle, op, r_handle);
342 }
343 }
344 // we cannot give owned operations to absorb_first
345 // since they'll also be needed by the r_handle copy
346 for op in self.oplog.iter_mut() {
347 T::absorb_first(w_handle, op, r_handle);
348 }
349 // the w_handle copy is about to become the r_handle, and can ignore the oplog
350 self.swap_index = self.oplog.len();
351
352 // w_handle (the old r_handle) is now fully up to date!
353 } else {
354 self.first = false
355 }
356
357 // at this point, we have exclusive access to w_handle, and it is up-to-date with all
358 // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
359 // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
360 // r_handle.
361 //
362 // it's now time for us to swap the copies so that readers see up-to-date results from
363 // w_handle.
364
365 // swap in our w_handle, and get r_handle in return
366 let r_handle = self
367 .r_handle
368 .inner
369 .swap(self.w_handle.as_ptr(), Ordering::Release);
370
371 // NOTE: at this point, there are likely still readers using r_handle.
372 // safety: r_handle was also created from a Box, so it is not null and is covariant.
373 self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
374
375 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
376 fence(Ordering::SeqCst);
377
378 for (ri, epoch) in epochs.iter() {
379 self.last_epochs[ri] = epoch.load(Ordering::Acquire);
380 }
381
382 #[cfg(test)]
383 {
384 self.refreshes += 1;
385 }
386
387 self
388 }
389
390 /// Publish as necessary to ensure that all operations are visible to readers.
391 ///
392 /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
393 /// This method will only do so if there are pending operations.
394 pub fn flush(&mut self) {
395 if self.has_pending_operations() {
396 self.publish();
397 }
398 }
399
400 /// Returns true if there are operations in the operational log that have not yet been exposed
401 /// to readers.
402 pub fn has_pending_operations(&self) -> bool {
403 // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
404 // there are operations that have not yet been applied to the _write_ handle.
405 self.swap_index < self.oplog.len()
406 }
407
408 /// Append the given operation to the operational log.
409 ///
410 /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
411 pub fn append(&mut self, op: O) -> &mut Self {
412 self.extend(std::iter::once(op));
413 self
414 }
415
416 /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
417 ///
418 /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
419 /// readers still present in this copy. This is not normally something you know; even after
420 /// calling `publish`, readers may still be in the write copy for some time. In general, the
421 /// only time you know this is okay is before the first call to `publish` (since no readers
422 /// ever entered the write copy).
423 // TODO: Make this return `Option<&mut T>`,
424 // and only `Some` if there are indeed to readers in the write copy.
425 pub fn raw_write_handle(&mut self) -> NonNull<T> {
426 self.w_handle
427 }
428
429 /// Returns the backing data structure.
430 ///
431 /// Makes sure that all the pending operations are applied and waits till all the read handles
432 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
433 /// returns the other copy as a [`Taken`] smart pointer.
434 pub fn take(mut self) -> Taken<T, O> {
435 // It is always safe to `expect` here because `take_inner` is private
436 // and it is only called here and in the drop impl. Since we have an owned
437 // `self` we know the drop has not yet been called. And every first call of
438 // `take_inner` returns `Some`
439 self.take_inner()
440 .expect("inner is only taken here then self is dropped")
441 }
442}
443
444// allow using write handle for reads
445use std::ops::Deref;
446impl<T, O> Deref for WriteHandle<T, O>
447where
448 T: Absorb<O>,
449{
450 type Target = ReadHandle<T>;
451 fn deref(&self) -> &Self::Target {
452 &self.r_handle
453 }
454}
455
456impl<T, O> Extend<O> for WriteHandle<T, O>
457where
458 T: Absorb<O>,
459{
460 /// Add multiple operations to the operational log.
461 ///
462 /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
463 fn extend<I>(&mut self, ops: I)
464 where
465 I: IntoIterator<Item = O>,
466 {
467 if self.first {
468 // Safety: we know there are no outstanding w_handle readers, since we haven't
469 // refreshed ever before, so we can modify it directly!
470 let mut w_inner = self.raw_write_handle();
471 let w_inner = unsafe { w_inner.as_mut() };
472 let r_handle = self.enter().expect("map has not yet been destroyed");
473 // Because we are operating directly on the map, and nothing is aliased, we do want
474 // to perform drops, so we invoke absorb_second.
475 for op in ops {
476 Absorb::absorb_second(w_inner, op, &*r_handle);
477 }
478 } else {
479 self.oplog.extend(ops);
480 }
481 }
482}
483
484/// `WriteHandle` can be sent across thread boundaries:
485///
486/// ```
487/// use left_right::WriteHandle;
488///
489/// struct Data;
490/// impl left_right::Absorb<()> for Data {
491/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
492/// fn sync_with(&mut self, _: &Self) {}
493/// }
494///
495/// fn is_send<T: Send>() {
496/// // dummy function just used for its parameterized type bound
497/// }
498///
499/// is_send::<WriteHandle<Data, ()>>()
500/// ```
501///
502/// As long as the inner types allow that of course.
503/// Namely, the data type has to be `Send`:
504///
505/// ```compile_fail
506/// use left_right::WriteHandle;
507/// use std::rc::Rc;
508///
509/// struct Data(Rc<()>);
510/// impl left_right::Absorb<()> for Data {
511/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
512/// }
513///
514/// fn is_send<T: Send>() {
515/// // dummy function just used for its parameterized type bound
516/// }
517///
518/// is_send::<WriteHandle<Data, ()>>()
519/// ```
520///
521/// .. the operation type has to be `Send`:
522///
523/// ```compile_fail
524/// use left_right::WriteHandle;
525/// use std::rc::Rc;
526///
527/// struct Data;
528/// impl left_right::Absorb<Rc<()>> for Data {
529/// fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
530/// }
531///
532/// fn is_send<T: Send>() {
533/// // dummy function just used for its parameterized type bound
534/// }
535///
536/// is_send::<WriteHandle<Data, Rc<()>>>()
537/// ```
538///
539/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
540///
541/// ```compile_fail
542/// use left_right::WriteHandle;
543/// use std::cell::Cell;
544///
545/// struct Data(Cell<()>);
546/// impl left_right::Absorb<()> for Data {
547/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
548/// }
549///
550/// fn is_send<T: Send>() {
551/// // dummy function just used for its parameterized type bound
552/// }
553///
554/// is_send::<WriteHandle<Data, ()>>()
555/// ```
556#[allow(dead_code)]
557struct CheckWriteHandleSend;
558
559#[cfg(test)]
560mod tests {
561 use crate::sync::{AtomicUsize, Mutex, Ordering};
562 use crate::Absorb;
563 use slab::Slab;
564 include!("./utilities.rs");
565
566 #[test]
567 fn append_test() {
568 let (mut w, _r) = crate::new::<i32, _>();
569 assert_eq!(w.first, true);
570 w.append(CounterAddOp(1));
571 assert_eq!(w.oplog.len(), 0);
572 assert_eq!(w.first, true);
573 w.publish();
574 assert_eq!(w.first, false);
575 w.append(CounterAddOp(2));
576 w.append(CounterAddOp(3));
577 assert_eq!(w.oplog.len(), 2);
578 }
579
580 #[test]
581 fn take_test() {
582 // publish twice then take with no pending operations
583 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
584 w.append(CounterAddOp(1));
585 w.publish();
586 w.append(CounterAddOp(1));
587 w.publish();
588 assert_eq!(*w.take(), 4);
589
590 // publish twice then pending operation published by take
591 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
592 w.append(CounterAddOp(1));
593 w.publish();
594 w.append(CounterAddOp(1));
595 w.publish();
596 w.append(CounterAddOp(2));
597 assert_eq!(*w.take(), 6);
598
599 // normal publish then pending operations published by take
600 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
601 w.append(CounterAddOp(1));
602 w.publish();
603 w.append(CounterAddOp(1));
604 assert_eq!(*w.take(), 4);
605
606 // pending operations published by take
607 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
608 w.append(CounterAddOp(1));
609 assert_eq!(*w.take(), 3);
610
611 // emptry op queue
612 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
613 w.append(CounterAddOp(1));
614 w.publish();
615 assert_eq!(*w.take(), 3);
616
617 // no operations
618 let (w, _r) = crate::new_from_empty::<i32, _>(2);
619 assert_eq!(*w.take(), 2);
620 }
621
622 #[test]
623 fn wait_test() {
624 use std::sync::{Arc, Barrier};
625 use std::thread;
626 let (mut w, _r) = crate::new::<i32, _>();
627
628 // Case 1: If epoch is set to default.
629 let test_epochs: crate::Epochs = Default::default();
630 let mut test_epochs = test_epochs.lock().unwrap();
631 // since there is no epoch to waiting for, wait function will return immediately.
632 w.wait(&mut test_epochs);
633
634 // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
635 // and wait has been called.
636 let held_epoch = Arc::new(AtomicUsize::new(1));
637
638 w.last_epochs = vec![2, 2, 1];
639 let mut epochs_slab = Slab::new();
640 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
641 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
642 epochs_slab.insert(Arc::clone(&held_epoch));
643
644 let barrier = Arc::new(Barrier::new(2));
645
646 let is_waiting = Arc::clone(&w.is_waiting);
647
648 // check writers waiting state before calling wait.
649 let is_waiting_v = is_waiting.load(Ordering::Relaxed);
650 assert_eq!(false, is_waiting_v);
651
652 let barrier2 = Arc::clone(&barrier);
653 let test_epochs = Arc::new(Mutex::new(epochs_slab));
654 let wait_handle = thread::spawn(move || {
655 barrier2.wait();
656 let mut test_epochs = test_epochs.lock().unwrap();
657 w.wait(&mut test_epochs);
658 });
659
660 barrier.wait();
661
662 // make sure that writer wait() will call first, only then allow to updates the held epoch.
663 while !is_waiting.load(Ordering::Relaxed) {
664 thread::yield_now();
665 }
666
667 held_epoch.fetch_add(1, Ordering::SeqCst);
668
669 // join to make sure that wait must return after the progress/increment
670 // of held_epoch.
671 let _ = wait_handle.join();
672 }
673
674 #[test]
675 fn flush_noblock() {
676 let (mut w, r) = crate::new::<i32, _>();
677 w.append(CounterAddOp(42));
678 w.publish();
679 assert_eq!(*r.enter().unwrap(), 42);
680
681 // pin the epoch
682 let _count = r.enter();
683 // refresh would hang here
684 assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
685 assert!(!w.has_pending_operations());
686 }
687
688 #[test]
689 fn flush_no_refresh() {
690 let (mut w, _) = crate::new::<i32, _>();
691
692 // Until we refresh, writes are written directly instead of going to the
693 // oplog (because there can't be any readers on the w_handle table).
694 assert!(!w.has_pending_operations());
695 w.publish();
696 assert!(!w.has_pending_operations());
697 assert_eq!(w.refreshes, 1);
698
699 w.append(CounterAddOp(42));
700 assert!(w.has_pending_operations());
701 w.publish();
702 assert!(!w.has_pending_operations());
703 assert_eq!(w.refreshes, 2);
704
705 w.append(CounterAddOp(42));
706 assert!(w.has_pending_operations());
707 w.publish();
708 assert!(!w.has_pending_operations());
709 assert_eq!(w.refreshes, 3);
710
711 // Sanity check that a refresh would have been visible
712 assert!(!w.has_pending_operations());
713 w.publish();
714 assert_eq!(w.refreshes, 4);
715 }
716}