left_right/write.rs
1use crate::read::ReadHandle;
2use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
3use crate::Absorb;
4use crossbeam_utils::CachePadded;
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9use std::{fmt, thread};
10
11#[cfg(test)]
12use std::sync::atomic::AtomicBool;
13
14/// A writer handle to a left-right guarded data structure.
15///
16/// All operations on the underlying data should be enqueued as operations of type `O` using
17/// [`append`](Self::append). The effect of this operations are only exposed to readers once
18/// [`publish`](Self::publish) is called.
19///
20/// # Reading through a `WriteHandle`
21///
22/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
23/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
24/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
25/// operations prior to the last call to [`publish`](Self::publish).
26pub struct WriteHandle<T, O>
27where
28 T: Absorb<O>,
29{
30 epochs: crate::Epochs,
31 w_handle: NonNull<T>,
32 oplog: VecDeque<O>,
33 swap_index: usize,
34 r_handle: ReadHandle<T>,
35 last_epochs: Vec<usize>,
36 #[cfg(test)]
37 refreshes: usize,
38 #[cfg(test)]
39 is_waiting: Arc<AtomicBool>,
40 /// Write directly to the write handle map, since no publish has happened.
41 first: bool,
42 /// A publish has happened, but the two copies have not been synchronized yet.
43 second: bool,
44 /// If we call `Self::take` the drop needs to be different.
45 taken: bool,
46}
47
48// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
49// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
50// `ReadHandle`, we also need to respect its Send requirements.
51unsafe impl<T, O> Send for WriteHandle<T, O>
52where
53 T: Absorb<O>,
54 T: Send,
55 O: Send,
56 ReadHandle<T>: Send,
57{
58}
59
60impl<T, O> fmt::Debug for WriteHandle<T, O>
61where
62 T: Absorb<O> + fmt::Debug,
63 O: fmt::Debug,
64{
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 f.debug_struct("WriteHandle")
67 .field("epochs", &self.epochs)
68 .field("w_handle", &self.w_handle)
69 .field("oplog", &self.oplog)
70 .field("swap_index", &self.swap_index)
71 .field("r_handle", &self.r_handle)
72 .field("first", &self.first)
73 .field("second", &self.second)
74 .finish()
75 }
76}
77
78/// A **smart pointer** to an owned backing data structure. This makes sure that the
79/// data is dropped correctly (using [`Absorb::drop_second`]).
80///
81/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
82pub struct Taken<T: Absorb<O>, O> {
83 inner: Option<Box<T>>,
84 _marker: PhantomData<O>,
85}
86
87impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
88 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89 f.debug_struct("Taken")
90 .field(
91 "inner",
92 self.inner
93 .as_ref()
94 .expect("inner is only taken in `into_box` which drops self"),
95 )
96 .finish()
97 }
98}
99
100impl<T: Absorb<O>, O> Deref for Taken<T, O> {
101 type Target = T;
102
103 fn deref(&self) -> &Self::Target {
104 self.inner
105 .as_ref()
106 .expect("inner is only taken in `into_box` which drops self")
107 }
108}
109
110impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
111 fn deref_mut(&mut self) -> &mut Self::Target {
112 self.inner
113 .as_mut()
114 .expect("inner is only taken in `into_box` which drops self")
115 }
116}
117
118impl<T: Absorb<O>, O> Taken<T, O> {
119 /// # Safety
120 ///
121 /// You must call [`Absorb::drop_second`] in case just dropping `T` is not safe and sufficient.
122 ///
123 /// If you used the default implementation of [`Absorb::drop_second`] (which just calls
124 /// [`drop`](Drop::drop)) you don't need to call [`Absorb::drop_second`].
125 pub unsafe fn into_box(mut self) -> Box<T> {
126 self.inner
127 .take()
128 .expect("inner is only taken here then self is dropped")
129 }
130}
131
132impl<T: Absorb<O>, O> Drop for Taken<T, O> {
133 fn drop(&mut self) {
134 if let Some(inner) = self.inner.take() {
135 T::drop_second(inner);
136 }
137 }
138}
139
140impl<T, O> WriteHandle<T, O>
141where
142 T: Absorb<O>,
143{
144 /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
145 ///
146 /// Makes sure that all the pending operations are applied and waits till all the read handles
147 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
148 /// returns the other copy as a [`Taken`] smart pointer.
149 fn take_inner(&mut self) -> Option<Taken<T, O>> {
150 use std::ptr;
151 // Can only take inner once.
152 if self.taken {
153 return None;
154 }
155
156 // Disallow taking again.
157 self.taken = true;
158
159 // first, ensure both copies are up to date
160 // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
161 if self.first || !self.oplog.is_empty() {
162 self.publish();
163 }
164 if !self.oplog.is_empty() {
165 self.publish();
166 }
167 assert!(self.oplog.is_empty());
168
169 // next, grab the read handle and set it to NULL
170 let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
171
172 // now, wait for all readers to depart
173 let epochs = Arc::clone(&self.epochs);
174 let mut epochs = epochs.lock().unwrap();
175 self.wait(&mut epochs);
176
177 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
178 fence(Ordering::SeqCst);
179
180 // all readers have now observed the NULL, so we own both handles.
181 // all operations have been applied to both w_handle and r_handle.
182 // give the underlying data structure an opportunity to handle the one copy differently:
183 //
184 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
185 Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
186
187 // next we take the r_handle and return it as a boxed value.
188 //
189 // this is safe, since we know that no readers are using this pointer
190 // anymore (due to the .wait() following swapping the pointer with NULL).
191 //
192 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
193 let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
194
195 Some(Taken {
196 inner: Some(boxed_r_handle),
197 _marker: PhantomData,
198 })
199 }
200}
201
202impl<T, O> Drop for WriteHandle<T, O>
203where
204 T: Absorb<O>,
205{
206 fn drop(&mut self) {
207 if let Some(inner) = self.take_inner() {
208 drop(inner);
209 }
210 }
211}
212
213impl<T, O> WriteHandle<T, O>
214where
215 T: Absorb<O>,
216{
217 pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
218 Self {
219 epochs,
220 // safety: Box<T> is not null and covariant.
221 w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
222 oplog: VecDeque::new(),
223 swap_index: 0,
224 r_handle,
225 last_epochs: Vec::new(),
226 #[cfg(test)]
227 is_waiting: Arc::new(AtomicBool::new(false)),
228 #[cfg(test)]
229 refreshes: 0,
230 first: true,
231 second: true,
232 taken: false,
233 }
234 }
235
236 fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<CachePadded<AtomicUsize>>>>) {
237 let mut iter = 0;
238 let mut starti = 0;
239
240 #[cfg(test)]
241 {
242 self.is_waiting.store(true, Ordering::Relaxed);
243 }
244 // we're over-estimating here, but slab doesn't expose its max index
245 self.last_epochs.resize(epochs.capacity(), 0);
246 'retry: loop {
247 // read all and see if all have changed (which is likely)
248 for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
249 // if the reader's epoch was even last we read it (which was _after_ the swap),
250 // then they either do not have the pointer, or must have read the pointer strictly
251 // after the swap. in either case, they cannot be using the old pointer value (what
252 // is now w_handle).
253 //
254 // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
255 // which is odd, and std::u{N}::MAX + 1 == 0 is even.
256 //
257 // note also that `ri` _may_ have been re-used since we last read into last_epochs.
258 // this is okay though, as a change still implies that the new reader must have
259 // arrived _after_ we did the atomic swap, and thus must also have seen the new
260 // pointer.
261 if self.last_epochs[ri] % 2 == 0 {
262 continue;
263 }
264
265 let now = epoch.load(Ordering::Acquire);
266 if now != self.last_epochs[ri] {
267 // reader must have seen the last swap, since they have done at least one
268 // operation since we last looked at their epoch, which _must_ mean that they
269 // are no longer using the old pointer value.
270 } else {
271 // reader may not have seen swap
272 // continue from this reader's epoch
273 starti = ii;
274
275 if !cfg!(loom) {
276 // how eagerly should we retry?
277 if iter != 20 {
278 iter += 1;
279 } else {
280 thread::yield_now();
281 }
282 }
283
284 #[cfg(loom)]
285 loom::thread::yield_now();
286
287 continue 'retry;
288 }
289 }
290 break;
291 }
292 #[cfg(test)]
293 {
294 self.is_waiting.store(false, Ordering::Relaxed);
295 }
296 }
297
298 /// Try to publish once without waiting.
299 ///
300 /// This performs a single, non-blocking check of reader epochs. If all current readers have
301 /// advanced since the last swap, it performs a publish and returns `true`. If any reader may
302 /// still be accessing the old copy, it does nothing and returns `false`.
303 ///
304 /// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
305 /// paths where skipping a publish is preferable to blocking; call again later or fall back to
306 /// [`publish`](Self::publish) if you must ensure visibility.
307 ///
308 /// Returns `true` if a publish occurred, `false` otherwise.
309 pub fn try_publish(&mut self) -> bool {
310 let epochs = Arc::clone(&self.epochs);
311 let mut epochs = epochs.lock().unwrap();
312
313 // This wait loop is exactly like the one in wait, except that if we find a reader that
314 // has not observed the latest swap, we return rather than spin-and-retry.
315 self.last_epochs.resize(epochs.capacity(), 0);
316 for (ri, epoch) in epochs.iter() {
317 if self.last_epochs[ri] % 2 == 0 {
318 continue;
319 }
320
321 let now = epoch.load(Ordering::Acquire);
322 if now != self.last_epochs[ri] {
323 continue;
324 } else {
325 return false;
326 }
327 }
328 #[cfg(test)]
329 {
330 self.is_waiting.store(false, Ordering::Relaxed);
331 }
332 self.update_and_swap(&mut epochs);
333
334 true
335 }
336
337 /// Publish all operations append to the log to reads.
338 ///
339 /// This method needs to wait for all readers to move to the "other" copy of the data so that
340 /// it can replay the operational log onto the stale copy the readers used to use. This can
341 /// take some time, especially if readers are executing slow operations, or if there are many
342 /// of them.
343 pub fn publish(&mut self) -> &mut Self {
344 // we need to wait until all epochs have changed since the swaps *or* until a "finished"
345 // flag has been observed to be on for two subsequent iterations (there still may be some
346 // readers present since we did the previous refresh)
347 //
348 // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
349 // only block on pre-existing readers, and they are never waiting to push onto epochs
350 // unless they have finished reading.
351 let epochs = Arc::clone(&self.epochs);
352 let mut epochs = epochs.lock().unwrap();
353
354 self.wait(&mut epochs);
355
356 self.update_and_swap(&mut epochs)
357 }
358
359 /// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
360 ///
361 /// This method must only be called when all readers have exited `w_handle` (e.g., after
362 /// `wait`).
363 fn update_and_swap(
364 &mut self,
365 epochs: &mut MutexGuard<'_, slab::Slab<Arc<CachePadded<AtomicUsize>>>>,
366 ) -> &mut Self {
367 if !self.first {
368 // all the readers have left!
369 // safety: we haven't freed the Box, and no readers are accessing the w_handle
370 let w_handle = unsafe { self.w_handle.as_mut() };
371
372 // safety: we will not swap while we hold this reference
373 let r_handle = unsafe {
374 self.r_handle
375 .inner
376 .load(Ordering::Acquire)
377 .as_ref()
378 .unwrap()
379 };
380
381 if self.second {
382 Absorb::sync_with(w_handle, r_handle);
383 self.second = false
384 }
385
386 // the w_handle copy has not seen any of the writes in the oplog
387 // the r_handle copy has not seen any of the writes following swap_index
388 if self.swap_index != 0 {
389 // we can drain out the operations that only the w_handle copy needs
390 //
391 // NOTE: the if above is because drain(0..0) would remove 0
392 for op in self.oplog.drain(0..self.swap_index) {
393 T::absorb_second(w_handle, op, r_handle);
394 }
395 }
396 // we cannot give owned operations to absorb_first
397 // since they'll also be needed by the r_handle copy
398 for op in self.oplog.iter_mut() {
399 T::absorb_first(w_handle, op, r_handle);
400 }
401 // the w_handle copy is about to become the r_handle, and can ignore the oplog
402 self.swap_index = self.oplog.len();
403
404 // w_handle (the old r_handle) is now fully up to date!
405 } else {
406 self.first = false
407 }
408
409 // at this point, we have exclusive access to w_handle, and it is up-to-date with all
410 // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
411 // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
412 // r_handle.
413 //
414 // it's now time for us to swap the copies so that readers see up-to-date results from
415 // w_handle.
416
417 // swap in our w_handle, and get r_handle in return
418 let r_handle = self
419 .r_handle
420 .inner
421 .swap(self.w_handle.as_ptr(), Ordering::Release);
422
423 // NOTE: at this point, there are likely still readers using r_handle.
424 // safety: r_handle was also created from a Box, so it is not null and is covariant.
425 self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
426
427 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
428 fence(Ordering::SeqCst);
429
430 for (ri, epoch) in epochs.iter() {
431 self.last_epochs[ri] = epoch.load(Ordering::Acquire);
432 }
433
434 #[cfg(test)]
435 {
436 self.refreshes += 1;
437 }
438
439 self
440 }
441
442 /// Publish as necessary to ensure that all operations are visible to readers.
443 ///
444 /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
445 /// This method will only do so if there are pending operations.
446 pub fn flush(&mut self) {
447 if self.has_pending_operations() {
448 self.publish();
449 }
450 }
451
452 /// Returns true if there are operations in the operational log that have not yet been exposed
453 /// to readers.
454 pub fn has_pending_operations(&self) -> bool {
455 // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
456 // there are operations that have not yet been applied to the _write_ handle.
457 self.swap_index < self.oplog.len()
458 }
459
460 /// Append the given operation to the operational log.
461 ///
462 /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
463 pub fn append(&mut self, op: O) -> &mut Self {
464 self.extend(std::iter::once(op));
465 self
466 }
467
468 /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
469 ///
470 /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
471 /// readers still present in this copy. This is not normally something you know; even after
472 /// calling `publish`, readers may still be in the write copy for some time. In general, the
473 /// only time you know this is okay is before the first call to `publish` (since no readers
474 /// ever entered the write copy).
475 // TODO: Make this return `Option<&mut T>`,
476 // and only `Some` if there are indeed to readers in the write copy.
477 pub fn raw_write_handle(&mut self) -> NonNull<T> {
478 self.w_handle
479 }
480
481 /// Returns the backing data structure.
482 ///
483 /// Makes sure that all the pending operations are applied and waits till all the read handles
484 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
485 /// returns the other copy as a [`Taken`] smart pointer.
486 pub fn take(mut self) -> Taken<T, O> {
487 // It is always safe to `expect` here because `take_inner` is private
488 // and it is only called here and in the drop impl. Since we have an owned
489 // `self` we know the drop has not yet been called. And every first call of
490 // `take_inner` returns `Some`
491 self.take_inner()
492 .expect("inner is only taken here then self is dropped")
493 }
494}
495
496// allow using write handle for reads
497use std::ops::Deref;
498impl<T, O> Deref for WriteHandle<T, O>
499where
500 T: Absorb<O>,
501{
502 type Target = ReadHandle<T>;
503 fn deref(&self) -> &Self::Target {
504 &self.r_handle
505 }
506}
507
508impl<T, O> Extend<O> for WriteHandle<T, O>
509where
510 T: Absorb<O>,
511{
512 /// Add multiple operations to the operational log.
513 ///
514 /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
515 fn extend<I>(&mut self, ops: I)
516 where
517 I: IntoIterator<Item = O>,
518 {
519 if self.first {
520 // Safety: we know there are no outstanding w_handle readers, since we haven't
521 // refreshed ever before, so we can modify it directly!
522 let mut w_inner = self.raw_write_handle();
523 let w_inner = unsafe { w_inner.as_mut() };
524 let r_handle = self.enter().expect("map has not yet been destroyed");
525 // Because we are operating directly on the map, and nothing is aliased, we do want
526 // to perform drops, so we invoke absorb_second.
527 for op in ops {
528 Absorb::absorb_second(w_inner, op, &*r_handle);
529 }
530 } else {
531 self.oplog.extend(ops);
532 }
533 }
534}
535
536/// `WriteHandle` can be sent across thread boundaries:
537///
538/// ```
539/// use left_right::WriteHandle;
540///
541/// struct Data;
542/// impl left_right::Absorb<()> for Data {
543/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
544/// fn sync_with(&mut self, _: &Self) {}
545/// }
546///
547/// fn is_send<T: Send>() {
548/// // dummy function just used for its parameterized type bound
549/// }
550///
551/// is_send::<WriteHandle<Data, ()>>()
552/// ```
553///
554/// As long as the inner types allow that of course.
555/// Namely, the data type has to be `Send`:
556///
557/// ```compile_fail
558/// use left_right::WriteHandle;
559/// use std::rc::Rc;
560///
561/// struct Data(Rc<()>);
562/// impl left_right::Absorb<()> for Data {
563/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
564/// }
565///
566/// fn is_send<T: Send>() {
567/// // dummy function just used for its parameterized type bound
568/// }
569///
570/// is_send::<WriteHandle<Data, ()>>()
571/// ```
572///
573/// .. the operation type has to be `Send`:
574///
575/// ```compile_fail
576/// use left_right::WriteHandle;
577/// use std::rc::Rc;
578///
579/// struct Data;
580/// impl left_right::Absorb<Rc<()>> for Data {
581/// fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
582/// }
583///
584/// fn is_send<T: Send>() {
585/// // dummy function just used for its parameterized type bound
586/// }
587///
588/// is_send::<WriteHandle<Data, Rc<()>>>()
589/// ```
590///
591/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
592///
593/// ```compile_fail
594/// use left_right::WriteHandle;
595/// use std::cell::Cell;
596///
597/// struct Data(Cell<()>);
598/// impl left_right::Absorb<()> for Data {
599/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
600/// }
601///
602/// fn is_send<T: Send>() {
603/// // dummy function just used for its parameterized type bound
604/// }
605///
606/// is_send::<WriteHandle<Data, ()>>()
607/// ```
608#[allow(dead_code)]
609struct CheckWriteHandleSend;
610
611#[cfg(test)]
612mod tests {
613 use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
614 use crate::{read, Absorb};
615 use slab::Slab;
616 include!("./utilities.rs");
617
618 #[test]
619 fn append_test() {
620 let (mut w, _r) = crate::new::<i32, _>();
621 assert_eq!(w.first, true);
622 w.append(CounterAddOp(1));
623 assert_eq!(w.oplog.len(), 0);
624 assert_eq!(w.first, true);
625 w.publish();
626 assert_eq!(w.first, false);
627 w.append(CounterAddOp(2));
628 w.append(CounterAddOp(3));
629 assert_eq!(w.oplog.len(), 2);
630 }
631
632 #[test]
633 fn take_test() {
634 // publish twice then take with no pending operations
635 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
636 w.append(CounterAddOp(1));
637 w.publish();
638 w.append(CounterAddOp(1));
639 w.publish();
640 assert_eq!(*w.take(), 4);
641
642 // publish twice then pending operation published by take
643 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
644 w.append(CounterAddOp(1));
645 w.publish();
646 w.append(CounterAddOp(1));
647 w.publish();
648 w.append(CounterAddOp(2));
649 assert_eq!(*w.take(), 6);
650
651 // normal publish then pending operations published by take
652 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
653 w.append(CounterAddOp(1));
654 w.publish();
655 w.append(CounterAddOp(1));
656 assert_eq!(*w.take(), 4);
657
658 // pending operations published by take
659 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
660 w.append(CounterAddOp(1));
661 assert_eq!(*w.take(), 3);
662
663 // emptry op queue
664 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
665 w.append(CounterAddOp(1));
666 w.publish();
667 assert_eq!(*w.take(), 3);
668
669 // no operations
670 let (w, _r) = crate::new_from_empty::<i32, _>(2);
671 assert_eq!(*w.take(), 2);
672 }
673
674 #[test]
675 fn wait_test() {
676 use std::sync::{Arc, Barrier};
677 use std::thread;
678 let (mut w, _r) = crate::new::<i32, _>();
679
680 // Case 1: If epoch is set to default.
681 let test_epochs: crate::Epochs = Default::default();
682 let mut test_epochs = test_epochs.lock().unwrap();
683 // since there is no epoch to waiting for, wait function will return immediately.
684 w.wait(&mut test_epochs);
685
686 // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
687 // and wait has been called.
688 let held_epoch = Arc::new(AtomicUsize::new(1));
689
690 w.last_epochs = vec![2, 2, 1];
691 let mut epochs_slab = Slab::new();
692 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
693 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
694 epochs_slab.insert(Arc::clone(&held_epoch));
695
696 let barrier = Arc::new(Barrier::new(2));
697
698 let is_waiting = Arc::clone(&w.is_waiting);
699
700 // check writers waiting state before calling wait.
701 let is_waiting_v = is_waiting.load(Ordering::Relaxed);
702 assert_eq!(false, is_waiting_v);
703
704 let barrier2 = Arc::clone(&barrier);
705 let test_epochs = Arc::new(Mutex::new(epochs_slab));
706 let wait_handle = thread::spawn(move || {
707 barrier2.wait();
708 let mut test_epochs = test_epochs.lock().unwrap();
709 w.wait(&mut test_epochs);
710 });
711
712 barrier.wait();
713
714 // make sure that writer wait() will call first, only then allow to updates the held epoch.
715 while !is_waiting.load(Ordering::Relaxed) {
716 thread::yield_now();
717 }
718
719 held_epoch.fetch_add(1, Ordering::SeqCst);
720
721 // join to make sure that wait must return after the progress/increment
722 // of held_epoch.
723 let _ = wait_handle.join();
724 }
725
726 #[test]
727 fn flush_noblock() {
728 let (mut w, r) = crate::new::<i32, _>();
729 w.append(CounterAddOp(42));
730 w.publish();
731 assert_eq!(*r.enter().unwrap(), 42);
732
733 // pin the epoch
734 let _count = r.enter();
735 // refresh would hang here
736 assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
737 assert!(!w.has_pending_operations());
738 }
739
740 #[test]
741 fn flush_no_refresh() {
742 let (mut w, _) = crate::new::<i32, _>();
743
744 // Until we refresh, writes are written directly instead of going to the
745 // oplog (because there can't be any readers on the w_handle table).
746 assert!(!w.has_pending_operations());
747 w.publish();
748 assert!(!w.has_pending_operations());
749 assert_eq!(w.refreshes, 1);
750
751 w.append(CounterAddOp(42));
752 assert!(w.has_pending_operations());
753 w.publish();
754 assert!(!w.has_pending_operations());
755 assert_eq!(w.refreshes, 2);
756
757 w.append(CounterAddOp(42));
758 assert!(w.has_pending_operations());
759 w.publish();
760 assert!(!w.has_pending_operations());
761 assert_eq!(w.refreshes, 3);
762
763 // Sanity check that a refresh would have been visible
764 assert!(!w.has_pending_operations());
765 w.publish();
766 assert_eq!(w.refreshes, 4);
767 }
768
769 #[test]
770 fn try_publish() {
771 let (mut w, _r) = crate::new::<i32, _>();
772
773 // Case 1: A reader has not advanced (odd and unchanged) -> returns false
774 let mut epochs_slab = Slab::new();
775 let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
776 // Ensure last_epochs sees this reader as odd and unchanged
777 w.last_epochs = vec![0; epochs_slab.capacity()];
778 w.last_epochs[idx] = 1;
779 w.epochs = Arc::new(Mutex::new(epochs_slab));
780 assert_eq!(w.try_publish(), false);
781
782 // Case 2: All readers have advanced since last swap -> returns true and publishes
783 let mut epochs_slab_ok = Slab::new();
784 let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
785 w.last_epochs = vec![0; epochs_slab_ok.capacity()];
786 w.last_epochs[idx_ok] = 1; // previously odd
787 w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
788 let before = w.refreshes;
789 assert_eq!(w.try_publish(), true);
790 assert_eq!(w.refreshes, before + 1);
791 }
792}