left_right/write.rs
1use crate::read::ReadHandle;
2use crate::Absorb;
3
4use crate::sync::{fence, Arc, AtomicUsize, MutexGuard, Ordering};
5use std::collections::VecDeque;
6use std::marker::PhantomData;
7use std::ops::DerefMut;
8use std::ptr::NonNull;
9#[cfg(test)]
10use std::sync::atomic::AtomicBool;
11use std::{fmt, thread};
12
13/// A writer handle to a left-right guarded data structure.
14///
15/// All operations on the underlying data should be enqueued as operations of type `O` using
16/// [`append`](Self::append). The effect of this operations are only exposed to readers once
17/// [`publish`](Self::publish) is called.
18///
19/// # Reading through a `WriteHandle`
20///
21/// `WriteHandle` allows access to a [`ReadHandle`] through `Deref<Target = ReadHandle>`. Note that
22/// since the reads go through a [`ReadHandle`], those reads are subject to the same visibility
23/// restrictions as reads that do not go through the `WriteHandle`: they only see the effects of
24/// operations prior to the last call to [`publish`](Self::publish).
25pub struct WriteHandle<T, O>
26where
27 T: Absorb<O>,
28{
29 epochs: crate::Epochs,
30 w_handle: NonNull<T>,
31 oplog: VecDeque<O>,
32 swap_index: usize,
33 r_handle: ReadHandle<T>,
34 last_epochs: Vec<usize>,
35 #[cfg(test)]
36 refreshes: usize,
37 #[cfg(test)]
38 is_waiting: Arc<AtomicBool>,
39 /// Write directly to the write handle map, since no publish has happened.
40 first: bool,
41 /// A publish has happened, but the two copies have not been synchronized yet.
42 second: bool,
43 /// If we call `Self::take` the drop needs to be different.
44 taken: bool,
45}
46
47// safety: if a `WriteHandle` is sent across a thread boundary, we need to be able to take
48// ownership of both Ts and Os across that thread boundary. since `WriteHandle` holds a
49// `ReadHandle`, we also need to respect its Send requirements.
50unsafe impl<T, O> Send for WriteHandle<T, O>
51where
52 T: Absorb<O>,
53 T: Send,
54 O: Send,
55 ReadHandle<T>: Send,
56{
57}
58
59impl<T, O> fmt::Debug for WriteHandle<T, O>
60where
61 T: Absorb<O> + fmt::Debug,
62 O: fmt::Debug,
63{
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65 f.debug_struct("WriteHandle")
66 .field("epochs", &self.epochs)
67 .field("w_handle", &self.w_handle)
68 .field("oplog", &self.oplog)
69 .field("swap_index", &self.swap_index)
70 .field("r_handle", &self.r_handle)
71 .field("first", &self.first)
72 .field("second", &self.second)
73 .finish()
74 }
75}
76
77/// A **smart pointer** to an owned backing data structure. This makes sure that the
78/// data is dropped correctly (using [`Absorb::drop_second`]).
79///
80/// Additionally it allows for unsafely getting the inner data out using [`into_box()`](Taken::into_box).
81pub struct Taken<T: Absorb<O>, O> {
82 inner: Option<Box<T>>,
83 _marker: PhantomData<O>,
84}
85
86impl<T: Absorb<O> + std::fmt::Debug, O> std::fmt::Debug for Taken<T, O> {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("Taken")
89 .field(
90 "inner",
91 self.inner
92 .as_ref()
93 .expect("inner is only taken in `into_box` which drops self"),
94 )
95 .finish()
96 }
97}
98
99impl<T: Absorb<O>, O> Deref for Taken<T, O> {
100 type Target = T;
101
102 fn deref(&self) -> &Self::Target {
103 self.inner
104 .as_ref()
105 .expect("inner is only taken in `into_box` which drops self")
106 }
107}
108
109impl<T: Absorb<O>, O> DerefMut for Taken<T, O> {
110 fn deref_mut(&mut self) -> &mut Self::Target {
111 self.inner
112 .as_mut()
113 .expect("inner is only taken in `into_box` which drops self")
114 }
115}
116
117impl<T: Absorb<O>, O> Taken<T, O> {
118 /// # Safety
119 ///
120 /// You must call [`Absorb::drop_second`] in case just dropping `T` is not safe and sufficient.
121 ///
122 /// If you used the default implementation of [`Absorb::drop_second`] (which just calls
123 /// [`drop`](Drop::drop)) you don't need to call [`Absorb::drop_second`].
124 pub unsafe fn into_box(mut self) -> Box<T> {
125 self.inner
126 .take()
127 .expect("inner is only taken here then self is dropped")
128 }
129}
130
131impl<T: Absorb<O>, O> Drop for Taken<T, O> {
132 fn drop(&mut self) {
133 if let Some(inner) = self.inner.take() {
134 T::drop_second(inner);
135 }
136 }
137}
138
139impl<T, O> WriteHandle<T, O>
140where
141 T: Absorb<O>,
142{
143 /// Takes out the inner backing data structure if it hasn't been taken yet. Otherwise returns `None`.
144 ///
145 /// Makes sure that all the pending operations are applied and waits till all the read handles
146 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
147 /// returns the other copy as a [`Taken`] smart pointer.
148 fn take_inner(&mut self) -> Option<Taken<T, O>> {
149 use std::ptr;
150 // Can only take inner once.
151 if self.taken {
152 return None;
153 }
154
155 // Disallow taking again.
156 self.taken = true;
157
158 // first, ensure both copies are up to date
159 // (otherwise safely dropping the possibly duplicated w_handle data is a pain)
160 if self.first || !self.oplog.is_empty() {
161 self.publish();
162 }
163 if !self.oplog.is_empty() {
164 self.publish();
165 }
166 assert!(self.oplog.is_empty());
167
168 // next, grab the read handle and set it to NULL
169 let r_handle = self.r_handle.inner.swap(ptr::null_mut(), Ordering::Release);
170
171 // now, wait for all readers to depart
172 let epochs = Arc::clone(&self.epochs);
173 let mut epochs = epochs.lock().unwrap();
174 self.wait(&mut epochs);
175
176 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
177 fence(Ordering::SeqCst);
178
179 // all readers have now observed the NULL, so we own both handles.
180 // all operations have been applied to both w_handle and r_handle.
181 // give the underlying data structure an opportunity to handle the one copy differently:
182 //
183 // safety: w_handle was initially crated from a `Box`, and is no longer aliased.
184 Absorb::drop_first(unsafe { Box::from_raw(self.w_handle.as_ptr()) });
185
186 // next we take the r_handle and return it as a boxed value.
187 //
188 // this is safe, since we know that no readers are using this pointer
189 // anymore (due to the .wait() following swapping the pointer with NULL).
190 //
191 // safety: r_handle was initially crated from a `Box`, and is no longer aliased.
192 let boxed_r_handle = unsafe { Box::from_raw(r_handle) };
193
194 Some(Taken {
195 inner: Some(boxed_r_handle),
196 _marker: PhantomData,
197 })
198 }
199}
200
201impl<T, O> Drop for WriteHandle<T, O>
202where
203 T: Absorb<O>,
204{
205 fn drop(&mut self) {
206 if let Some(inner) = self.take_inner() {
207 drop(inner);
208 }
209 }
210}
211
212impl<T, O> WriteHandle<T, O>
213where
214 T: Absorb<O>,
215{
216 pub(crate) fn new(w_handle: T, epochs: crate::Epochs, r_handle: ReadHandle<T>) -> Self {
217 Self {
218 epochs,
219 // safety: Box<T> is not null and covariant.
220 w_handle: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(w_handle))) },
221 oplog: VecDeque::new(),
222 swap_index: 0,
223 r_handle,
224 last_epochs: Vec::new(),
225 #[cfg(test)]
226 is_waiting: Arc::new(AtomicBool::new(false)),
227 #[cfg(test)]
228 refreshes: 0,
229 first: true,
230 second: true,
231 taken: false,
232 }
233 }
234
235 fn wait(&mut self, epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>) {
236 let mut iter = 0;
237 let mut starti = 0;
238
239 #[cfg(test)]
240 {
241 self.is_waiting.store(true, Ordering::Relaxed);
242 }
243 // we're over-estimating here, but slab doesn't expose its max index
244 self.last_epochs.resize(epochs.capacity(), 0);
245 'retry: loop {
246 // read all and see if all have changed (which is likely)
247 for (ii, (ri, epoch)) in epochs.iter().enumerate().skip(starti) {
248 // if the reader's epoch was even last we read it (which was _after_ the swap),
249 // then they either do not have the pointer, or must have read the pointer strictly
250 // after the swap. in either case, they cannot be using the old pointer value (what
251 // is now w_handle).
252 //
253 // note that this holds even with wrap-around since std::u{N}::MAX == 2 ^ N - 1,
254 // which is odd, and std::u{N}::MAX + 1 == 0 is even.
255 //
256 // note also that `ri` _may_ have been re-used since we last read into last_epochs.
257 // this is okay though, as a change still implies that the new reader must have
258 // arrived _after_ we did the atomic swap, and thus must also have seen the new
259 // pointer.
260 if self.last_epochs[ri] % 2 == 0 {
261 continue;
262 }
263
264 let now = epoch.load(Ordering::Acquire);
265 if now != self.last_epochs[ri] {
266 // reader must have seen the last swap, since they have done at least one
267 // operation since we last looked at their epoch, which _must_ mean that they
268 // are no longer using the old pointer value.
269 } else {
270 // reader may not have seen swap
271 // continue from this reader's epoch
272 starti = ii;
273
274 if !cfg!(loom) {
275 // how eagerly should we retry?
276 if iter != 20 {
277 iter += 1;
278 } else {
279 thread::yield_now();
280 }
281 }
282
283 #[cfg(loom)]
284 loom::thread::yield_now();
285
286 continue 'retry;
287 }
288 }
289 break;
290 }
291 #[cfg(test)]
292 {
293 self.is_waiting.store(false, Ordering::Relaxed);
294 }
295 }
296
297 /// Try to publish once without waiting.
298 ///
299 /// This performs a single, non-blocking check of reader epochs. If all current readers have
300 /// advanced since the last swap, it performs a publish and returns `true`. If any reader may
301 /// still be accessing the old copy, it does nothing and returns `false`.
302 ///
303 /// Unlike [`publish`](Self::publish), this never spins or waits. Use it on latency-sensitive
304 /// paths where skipping a publish is preferable to blocking; call again later or fall back to
305 /// [`publish`](Self::publish) if you must ensure visibility.
306 ///
307 /// Returns `true` if a publish occurred, `false` otherwise.
308 pub fn try_publish(&mut self) -> bool {
309 let epochs = Arc::clone(&self.epochs);
310 let mut epochs = epochs.lock().unwrap();
311
312 // This wait loop is exactly like the one in wait, except that if we find a reader that
313 // has not observed the latest swap, we return rather than spin-and-retry.
314 self.last_epochs.resize(epochs.capacity(), 0);
315 for (ri, epoch) in epochs.iter() {
316 if self.last_epochs[ri] % 2 == 0 {
317 continue;
318 }
319
320 let now = epoch.load(Ordering::Acquire);
321 if now != self.last_epochs[ri] {
322 continue;
323 } else {
324 return false;
325 }
326 }
327 #[cfg(test)]
328 {
329 self.is_waiting.store(false, Ordering::Relaxed);
330 }
331 self.update_and_swap(&mut epochs);
332
333 true
334 }
335
336 /// Publish all operations append to the log to reads.
337 ///
338 /// This method needs to wait for all readers to move to the "other" copy of the data so that
339 /// it can replay the operational log onto the stale copy the readers used to use. This can
340 /// take some time, especially if readers are executing slow operations, or if there are many
341 /// of them.
342 pub fn publish(&mut self) -> &mut Self {
343 // we need to wait until all epochs have changed since the swaps *or* until a "finished"
344 // flag has been observed to be on for two subsequent iterations (there still may be some
345 // readers present since we did the previous refresh)
346 //
347 // NOTE: it is safe for us to hold the lock for the entire duration of the swap. we will
348 // only block on pre-existing readers, and they are never waiting to push onto epochs
349 // unless they have finished reading.
350 let epochs = Arc::clone(&self.epochs);
351 let mut epochs = epochs.lock().unwrap();
352
353 self.wait(&mut epochs);
354
355 self.update_and_swap(&mut epochs)
356 }
357
358 /// Brings `w_handle` up to date with the oplog, then swaps `r_handle` and `w_handle`.
359 ///
360 /// This method must only be called when all readers have exited `w_handle` (e.g., after
361 /// `wait`).
362 fn update_and_swap(
363 &mut self,
364 epochs: &mut MutexGuard<'_, slab::Slab<Arc<AtomicUsize>>>,
365 ) -> &mut Self {
366 if !self.first {
367 // all the readers have left!
368 // safety: we haven't freed the Box, and no readers are accessing the w_handle
369 let w_handle = unsafe { self.w_handle.as_mut() };
370
371 // safety: we will not swap while we hold this reference
372 let r_handle = unsafe {
373 self.r_handle
374 .inner
375 .load(Ordering::Acquire)
376 .as_ref()
377 .unwrap()
378 };
379
380 if self.second {
381 Absorb::sync_with(w_handle, r_handle);
382 self.second = false
383 }
384
385 // the w_handle copy has not seen any of the writes in the oplog
386 // the r_handle copy has not seen any of the writes following swap_index
387 if self.swap_index != 0 {
388 // we can drain out the operations that only the w_handle copy needs
389 //
390 // NOTE: the if above is because drain(0..0) would remove 0
391 for op in self.oplog.drain(0..self.swap_index) {
392 T::absorb_second(w_handle, op, r_handle);
393 }
394 }
395 // we cannot give owned operations to absorb_first
396 // since they'll also be needed by the r_handle copy
397 for op in self.oplog.iter_mut() {
398 T::absorb_first(w_handle, op, r_handle);
399 }
400 // the w_handle copy is about to become the r_handle, and can ignore the oplog
401 self.swap_index = self.oplog.len();
402
403 // w_handle (the old r_handle) is now fully up to date!
404 } else {
405 self.first = false
406 }
407
408 // at this point, we have exclusive access to w_handle, and it is up-to-date with all
409 // writes. the stale r_handle is accessed by readers through an Arc clone of atomic pointer
410 // inside the ReadHandle. oplog contains all the changes that are in w_handle, but not in
411 // r_handle.
412 //
413 // it's now time for us to swap the copies so that readers see up-to-date results from
414 // w_handle.
415
416 // swap in our w_handle, and get r_handle in return
417 let r_handle = self
418 .r_handle
419 .inner
420 .swap(self.w_handle.as_ptr(), Ordering::Release);
421
422 // NOTE: at this point, there are likely still readers using r_handle.
423 // safety: r_handle was also created from a Box, so it is not null and is covariant.
424 self.w_handle = unsafe { NonNull::new_unchecked(r_handle) };
425
426 // ensure that the subsequent epoch reads aren't re-ordered to before the swap
427 fence(Ordering::SeqCst);
428
429 for (ri, epoch) in epochs.iter() {
430 self.last_epochs[ri] = epoch.load(Ordering::Acquire);
431 }
432
433 #[cfg(test)]
434 {
435 self.refreshes += 1;
436 }
437
438 self
439 }
440
441 /// Publish as necessary to ensure that all operations are visible to readers.
442 ///
443 /// `WriteHandle::publish` will *always* wait for old readers to depart and swap the maps.
444 /// This method will only do so if there are pending operations.
445 pub fn flush(&mut self) {
446 if self.has_pending_operations() {
447 self.publish();
448 }
449 }
450
451 /// Returns true if there are operations in the operational log that have not yet been exposed
452 /// to readers.
453 pub fn has_pending_operations(&self) -> bool {
454 // NOTE: we don't use self.oplog.is_empty() here because it's not really that important if
455 // there are operations that have not yet been applied to the _write_ handle.
456 self.swap_index < self.oplog.len()
457 }
458
459 /// Append the given operation to the operational log.
460 ///
461 /// Its effects will not be exposed to readers until you call [`publish`](Self::publish).
462 pub fn append(&mut self, op: O) -> &mut Self {
463 self.extend(std::iter::once(op));
464 self
465 }
466
467 /// Returns a raw pointer to the write copy of the data (the one readers are _not_ accessing).
468 ///
469 /// Note that it is only safe to mutate through this pointer if you _know_ that there are no
470 /// readers still present in this copy. This is not normally something you know; even after
471 /// calling `publish`, readers may still be in the write copy for some time. In general, the
472 /// only time you know this is okay is before the first call to `publish` (since no readers
473 /// ever entered the write copy).
474 // TODO: Make this return `Option<&mut T>`,
475 // and only `Some` if there are indeed to readers in the write copy.
476 pub fn raw_write_handle(&mut self) -> NonNull<T> {
477 self.w_handle
478 }
479
480 /// Returns the backing data structure.
481 ///
482 /// Makes sure that all the pending operations are applied and waits till all the read handles
483 /// have departed. Then it uses [`Absorb::drop_first`] to drop one of the copies of the data and
484 /// returns the other copy as a [`Taken`] smart pointer.
485 pub fn take(mut self) -> Taken<T, O> {
486 // It is always safe to `expect` here because `take_inner` is private
487 // and it is only called here and in the drop impl. Since we have an owned
488 // `self` we know the drop has not yet been called. And every first call of
489 // `take_inner` returns `Some`
490 self.take_inner()
491 .expect("inner is only taken here then self is dropped")
492 }
493}
494
495// allow using write handle for reads
496use std::ops::Deref;
497impl<T, O> Deref for WriteHandle<T, O>
498where
499 T: Absorb<O>,
500{
501 type Target = ReadHandle<T>;
502 fn deref(&self) -> &Self::Target {
503 &self.r_handle
504 }
505}
506
507impl<T, O> Extend<O> for WriteHandle<T, O>
508where
509 T: Absorb<O>,
510{
511 /// Add multiple operations to the operational log.
512 ///
513 /// Their effects will not be exposed to readers until you call [`publish`](Self::publish)
514 fn extend<I>(&mut self, ops: I)
515 where
516 I: IntoIterator<Item = O>,
517 {
518 if self.first {
519 // Safety: we know there are no outstanding w_handle readers, since we haven't
520 // refreshed ever before, so we can modify it directly!
521 let mut w_inner = self.raw_write_handle();
522 let w_inner = unsafe { w_inner.as_mut() };
523 let r_handle = self.enter().expect("map has not yet been destroyed");
524 // Because we are operating directly on the map, and nothing is aliased, we do want
525 // to perform drops, so we invoke absorb_second.
526 for op in ops {
527 Absorb::absorb_second(w_inner, op, &*r_handle);
528 }
529 } else {
530 self.oplog.extend(ops);
531 }
532 }
533}
534
535/// `WriteHandle` can be sent across thread boundaries:
536///
537/// ```
538/// use left_right::WriteHandle;
539///
540/// struct Data;
541/// impl left_right::Absorb<()> for Data {
542/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
543/// fn sync_with(&mut self, _: &Self) {}
544/// }
545///
546/// fn is_send<T: Send>() {
547/// // dummy function just used for its parameterized type bound
548/// }
549///
550/// is_send::<WriteHandle<Data, ()>>()
551/// ```
552///
553/// As long as the inner types allow that of course.
554/// Namely, the data type has to be `Send`:
555///
556/// ```compile_fail
557/// use left_right::WriteHandle;
558/// use std::rc::Rc;
559///
560/// struct Data(Rc<()>);
561/// impl left_right::Absorb<()> for Data {
562/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
563/// }
564///
565/// fn is_send<T: Send>() {
566/// // dummy function just used for its parameterized type bound
567/// }
568///
569/// is_send::<WriteHandle<Data, ()>>()
570/// ```
571///
572/// .. the operation type has to be `Send`:
573///
574/// ```compile_fail
575/// use left_right::WriteHandle;
576/// use std::rc::Rc;
577///
578/// struct Data;
579/// impl left_right::Absorb<Rc<()>> for Data {
580/// fn absorb_first(&mut self, _: &mut Rc<()>, _: &Self) {}
581/// }
582///
583/// fn is_send<T: Send>() {
584/// // dummy function just used for its parameterized type bound
585/// }
586///
587/// is_send::<WriteHandle<Data, Rc<()>>>()
588/// ```
589///
590/// .. and the data type has to be `Sync` so it's still okay to read through `ReadHandle`s:
591///
592/// ```compile_fail
593/// use left_right::WriteHandle;
594/// use std::cell::Cell;
595///
596/// struct Data(Cell<()>);
597/// impl left_right::Absorb<()> for Data {
598/// fn absorb_first(&mut self, _: &mut (), _: &Self) {}
599/// }
600///
601/// fn is_send<T: Send>() {
602/// // dummy function just used for its parameterized type bound
603/// }
604///
605/// is_send::<WriteHandle<Data, ()>>()
606/// ```
607#[allow(dead_code)]
608struct CheckWriteHandleSend;
609
610#[cfg(test)]
611mod tests {
612 use crate::sync::{Arc, AtomicUsize, Mutex, Ordering};
613 use crate::{read, Absorb};
614 use slab::Slab;
615 include!("./utilities.rs");
616
617 #[test]
618 fn append_test() {
619 let (mut w, _r) = crate::new::<i32, _>();
620 assert_eq!(w.first, true);
621 w.append(CounterAddOp(1));
622 assert_eq!(w.oplog.len(), 0);
623 assert_eq!(w.first, true);
624 w.publish();
625 assert_eq!(w.first, false);
626 w.append(CounterAddOp(2));
627 w.append(CounterAddOp(3));
628 assert_eq!(w.oplog.len(), 2);
629 }
630
631 #[test]
632 fn take_test() {
633 // publish twice then take with no pending operations
634 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
635 w.append(CounterAddOp(1));
636 w.publish();
637 w.append(CounterAddOp(1));
638 w.publish();
639 assert_eq!(*w.take(), 4);
640
641 // publish twice then pending operation published by take
642 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
643 w.append(CounterAddOp(1));
644 w.publish();
645 w.append(CounterAddOp(1));
646 w.publish();
647 w.append(CounterAddOp(2));
648 assert_eq!(*w.take(), 6);
649
650 // normal publish then pending operations published by take
651 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
652 w.append(CounterAddOp(1));
653 w.publish();
654 w.append(CounterAddOp(1));
655 assert_eq!(*w.take(), 4);
656
657 // pending operations published by take
658 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
659 w.append(CounterAddOp(1));
660 assert_eq!(*w.take(), 3);
661
662 // emptry op queue
663 let (mut w, _r) = crate::new_from_empty::<i32, _>(2);
664 w.append(CounterAddOp(1));
665 w.publish();
666 assert_eq!(*w.take(), 3);
667
668 // no operations
669 let (w, _r) = crate::new_from_empty::<i32, _>(2);
670 assert_eq!(*w.take(), 2);
671 }
672
673 #[test]
674 fn wait_test() {
675 use std::sync::{Arc, Barrier};
676 use std::thread;
677 let (mut w, _r) = crate::new::<i32, _>();
678
679 // Case 1: If epoch is set to default.
680 let test_epochs: crate::Epochs = Default::default();
681 let mut test_epochs = test_epochs.lock().unwrap();
682 // since there is no epoch to waiting for, wait function will return immediately.
683 w.wait(&mut test_epochs);
684
685 // Case 2: If one of the reader is still reading(epoch is odd and count is same as in last_epoch)
686 // and wait has been called.
687 let held_epoch = Arc::new(AtomicUsize::new(1));
688
689 w.last_epochs = vec![2, 2, 1];
690 let mut epochs_slab = Slab::new();
691 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
692 epochs_slab.insert(Arc::new(AtomicUsize::new(2)));
693 epochs_slab.insert(Arc::clone(&held_epoch));
694
695 let barrier = Arc::new(Barrier::new(2));
696
697 let is_waiting = Arc::clone(&w.is_waiting);
698
699 // check writers waiting state before calling wait.
700 let is_waiting_v = is_waiting.load(Ordering::Relaxed);
701 assert_eq!(false, is_waiting_v);
702
703 let barrier2 = Arc::clone(&barrier);
704 let test_epochs = Arc::new(Mutex::new(epochs_slab));
705 let wait_handle = thread::spawn(move || {
706 barrier2.wait();
707 let mut test_epochs = test_epochs.lock().unwrap();
708 w.wait(&mut test_epochs);
709 });
710
711 barrier.wait();
712
713 // make sure that writer wait() will call first, only then allow to updates the held epoch.
714 while !is_waiting.load(Ordering::Relaxed) {
715 thread::yield_now();
716 }
717
718 held_epoch.fetch_add(1, Ordering::SeqCst);
719
720 // join to make sure that wait must return after the progress/increment
721 // of held_epoch.
722 let _ = wait_handle.join();
723 }
724
725 #[test]
726 fn flush_noblock() {
727 let (mut w, r) = crate::new::<i32, _>();
728 w.append(CounterAddOp(42));
729 w.publish();
730 assert_eq!(*r.enter().unwrap(), 42);
731
732 // pin the epoch
733 let _count = r.enter();
734 // refresh would hang here
735 assert_eq!(w.oplog.iter().skip(w.swap_index).count(), 0);
736 assert!(!w.has_pending_operations());
737 }
738
739 #[test]
740 fn flush_no_refresh() {
741 let (mut w, _) = crate::new::<i32, _>();
742
743 // Until we refresh, writes are written directly instead of going to the
744 // oplog (because there can't be any readers on the w_handle table).
745 assert!(!w.has_pending_operations());
746 w.publish();
747 assert!(!w.has_pending_operations());
748 assert_eq!(w.refreshes, 1);
749
750 w.append(CounterAddOp(42));
751 assert!(w.has_pending_operations());
752 w.publish();
753 assert!(!w.has_pending_operations());
754 assert_eq!(w.refreshes, 2);
755
756 w.append(CounterAddOp(42));
757 assert!(w.has_pending_operations());
758 w.publish();
759 assert!(!w.has_pending_operations());
760 assert_eq!(w.refreshes, 3);
761
762 // Sanity check that a refresh would have been visible
763 assert!(!w.has_pending_operations());
764 w.publish();
765 assert_eq!(w.refreshes, 4);
766 }
767
768 #[test]
769 fn try_publish() {
770 let (mut w, _r) = crate::new::<i32, _>();
771
772 // Case 1: A reader has not advanced (odd and unchanged) -> returns false
773 let mut epochs_slab = Slab::new();
774 let idx = epochs_slab.insert(Arc::new(AtomicUsize::new(1))); // odd epoch, "in read"
775 // Ensure last_epochs sees this reader as odd and unchanged
776 w.last_epochs = vec![0; epochs_slab.capacity()];
777 w.last_epochs[idx] = 1;
778 w.epochs = Arc::new(Mutex::new(epochs_slab));
779 assert_eq!(w.try_publish(), false);
780
781 // Case 2: All readers have advanced since last swap -> returns true and publishes
782 let mut epochs_slab_ok = Slab::new();
783 let idx_ok = epochs_slab_ok.insert(Arc::new(AtomicUsize::new(2))); // advanced
784 w.last_epochs = vec![0; epochs_slab_ok.capacity()];
785 w.last_epochs[idx_ok] = 1; // previously odd
786 w.epochs = Arc::new(Mutex::new(epochs_slab_ok));
787 let before = w.refreshes;
788 assert_eq!(w.try_publish(), true);
789 assert_eq!(w.refreshes, before + 1);
790 }
791}