ccl_crossbeam_epoch/internal.rs
1//! The global data and participant for garbage collection.
2//!
3//! # Registration
4//!
5//! In order to track all participants in one place, we need some form of participant
6//! registration. When a participant is created, it is registered to a global lock-free
7//! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8//! list.
9//!
10//! # Pinning
11//!
12//! Every participant contains an integer that tells whether the participant is pinned and if so,
13//! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14//! aids in periodic global epoch advancement.
15//!
16//! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17//! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18//!
19//! # Thread-local bag
20//!
21//! Objects that get unlinked from concurrent data structures must be stashed away until the global
22//! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23//! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24//! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25//! for amortizing the synchronization cost of pushing the garbages to a global queue.
26//!
27//! # Global queue
28//!
29//! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30//! destroyed along the way. This design reduces contention on data structures. The global queue
31//! cannot be explicitly accessed: the only way to interact with it is by calling functions
32//! `defer()` that adds an object tothe thread-local bag, or `collect()` that manually triggers
33//! garbage collection.
34//!
35//! Ideally each instance of concurrent data structure may have its own queue that gets fully
36//! destroyed as soon as the data structure gets dropped.
37
38use core::cell::{Cell, UnsafeCell};
39use core::mem::{self, ManuallyDrop};
40use core::num::Wrapping;
41use core::ptr;
42use core::sync::atomic;
43use core::sync::atomic::Ordering;
44
45use arrayvec::ArrayVec;
46use crossbeam_utils::CachePadded;
47
48use atomic::Owned;
49use collector::{Collector, LocalHandle};
50use deferred::Deferred;
51use epoch::{AtomicEpoch, Epoch};
52use guard::{unprotected, Guard};
53use sync::list::{Entry, IsElement, IterError, List};
54use sync::queue::Queue;
55
56/// Maximum number of objects a bag can contain.
57#[cfg(not(feature = "sanitize"))]
58const MAX_OBJECTS: usize = 64;
59#[cfg(feature = "sanitize")]
60const MAX_OBJECTS: usize = 4;
61
62/// A bag of deferred functions.
63#[derive(Default, Debug)]
64pub struct Bag {
65 /// Stashed objects.
66 deferreds: ArrayVec<[Deferred; MAX_OBJECTS]>,
67}
68
69/// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
70unsafe impl Send for Bag {}
71
72impl Bag {
73 /// Returns a new, empty bag.
74 pub fn new() -> Self {
75 Self::default()
76 }
77
78 /// Returns `true` if the bag is empty.
79 pub fn is_empty(&self) -> bool {
80 self.deferreds.is_empty()
81 }
82
83 /// Attempts to insert a deferred function into the bag.
84 ///
85 /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
86 /// full.
87 ///
88 /// # Safety
89 ///
90 /// It should be safe for another thread to execute the given function.
91 pub unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
92 self.deferreds.try_push(deferred).map_err(|e| e.element())
93 }
94
95 /// Seals the bag with the given epoch.
96 fn seal(self, epoch: Epoch) -> SealedBag {
97 SealedBag { epoch, bag: self }
98 }
99}
100
101impl Drop for Bag {
102 fn drop(&mut self) {
103 // Call all deferred functions.
104 for deferred in self.deferreds.drain(..) {
105 deferred.call();
106 }
107 }
108}
109
110/// A pair of an epoch and a bag.
111#[derive(Default, Debug)]
112struct SealedBag {
113 epoch: Epoch,
114 bag: Bag,
115}
116
117/// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
118unsafe impl Sync for SealedBag {}
119
120impl SealedBag {
121 /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
122 fn is_expired(&self, global_epoch: Epoch) -> bool {
123 // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
124 // is within one epoch of the current one cannot be destroyed yet.
125 global_epoch.wrapping_sub(self.epoch) >= 2
126 }
127}
128
129/// The global data for a garbage collector.
130pub struct Global {
131 /// The intrusive linked list of `Local`s.
132 locals: List<Local>,
133
134 /// The global queue of bags of deferred functions.
135 queue: Queue<SealedBag>,
136
137 /// The global epoch.
138 pub(crate) epoch: CachePadded<AtomicEpoch>,
139}
140
141impl Global {
142 /// Number of bags to destroy.
143 const COLLECT_STEPS: usize = 8;
144
145 /// Creates a new global data for garbage collection.
146 #[inline]
147 pub fn new() -> Self {
148 Self {
149 locals: List::new(),
150 queue: Queue::new(),
151 epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
152 }
153 }
154
155 /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
156 pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
157 let bag = mem::replace(bag, Bag::new());
158
159 atomic::fence(Ordering::SeqCst);
160
161 let epoch = self.epoch.load(Ordering::Relaxed);
162 self.queue.push(bag.seal(epoch), guard);
163 }
164
165 /// Collects several bags from the global queue and executes deferred functions in them.
166 ///
167 /// Note: This may itself produce garbage and in turn allocate new bags.
168 ///
169 /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
170 /// path. In other words, we want the compiler to optimize branching for the case when
171 /// `collect()` is not called.
172 #[cold]
173 pub fn collect(&self, guard: &Guard) {
174 let global_epoch = self.try_advance(guard);
175
176 let steps = if cfg!(feature = "sanitize") {
177 usize::max_value()
178 } else {
179 Self::COLLECT_STEPS
180 };
181
182 for _ in 0..steps {
183 match self.queue.try_pop_if(
184 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
185 guard,
186 ) {
187 None => break,
188 Some(sealed_bag) => drop(sealed_bag),
189 }
190 }
191 }
192
193 /// Attempts to advance the global epoch.
194 ///
195 /// The global epoch can advance only if all currently pinned participants have been pinned in
196 /// the current epoch.
197 ///
198 /// Returns the current global epoch.
199 ///
200 /// `try_advance()` is annotated `#[cold]` because it is rarely called.
201 #[cold]
202 pub fn try_advance(&self, guard: &Guard) -> Epoch {
203 let global_epoch = self.epoch.load(Ordering::Relaxed);
204 atomic::fence(Ordering::SeqCst);
205
206 // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
207 // easy to implement in a lock-free manner. However, traversal can be slow due to cache
208 // misses and data dependencies. We should experiment with other data structures as well.
209 for local in self.locals.iter(&guard) {
210 match local {
211 Err(IterError::Stalled) => {
212 // A concurrent thread stalled this iteration. That thread might also try to
213 // advance the epoch, in which case we leave the job to it. Otherwise, the
214 // epoch will not be advanced.
215 return global_epoch;
216 }
217 Ok(local) => {
218 let local_epoch = local.epoch.load(Ordering::Relaxed);
219
220 // If the participant was pinned in a different epoch, we cannot advance the
221 // global epoch just yet.
222 if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
223 return global_epoch;
224 }
225 }
226 }
227 }
228 atomic::fence(Ordering::Acquire);
229
230 // All pinned participants were pinned in the current global epoch.
231 // Now let's advance the global epoch...
232 //
233 // Note that if another thread already advanced it before us, this store will simply
234 // overwrite the global epoch with the same value. This is true because `try_advance` was
235 // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
236 // advanced two steps ahead of it.
237 let new_epoch = global_epoch.successor();
238 self.epoch.store(new_epoch, Ordering::Release);
239 new_epoch
240 }
241}
242
243/// Participant for garbage collection.
244pub struct Local {
245 /// A node in the intrusive linked list of `Local`s.
246 entry: Entry,
247
248 /// The local epoch.
249 epoch: AtomicEpoch,
250
251 /// A reference to the global data.
252 ///
253 /// When all guards and handles get dropped, this reference is destroyed.
254 collector: UnsafeCell<ManuallyDrop<Collector>>,
255
256 /// The local bag of deferred functions.
257 pub(crate) bag: UnsafeCell<Bag>,
258
259 /// The number of guards keeping this participant pinned.
260 guard_count: Cell<usize>,
261
262 /// The number of active handles.
263 handle_count: Cell<usize>,
264
265 /// Total number of pinnings performed.
266 ///
267 /// This is just an auxilliary counter that sometimes kicks off collection.
268 pin_count: Cell<Wrapping<usize>>,
269}
270
271impl Local {
272 /// Number of pinnings after which a participant will execute some deferred functions from the
273 /// global queue.
274 const PINNINGS_BETWEEN_COLLECT: usize = 128;
275
276 /// Registers a new `Local` in the provided `Global`.
277 pub fn register(collector: &Collector) -> LocalHandle {
278 unsafe {
279 // Since we dereference no pointers in this block, it is safe to use `unprotected`.
280
281 let local = Owned::new(Local {
282 entry: Entry::default(),
283 epoch: AtomicEpoch::new(Epoch::starting()),
284 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
285 bag: UnsafeCell::new(Bag::new()),
286 guard_count: Cell::new(0),
287 handle_count: Cell::new(1),
288 pin_count: Cell::new(Wrapping(0)),
289 })
290 .into_shared(&unprotected());
291 collector.global.locals.insert(local, &unprotected());
292 LocalHandle {
293 local: local.as_raw(),
294 }
295 }
296 }
297
298 /// Returns a reference to the `Global` in which this `Local` resides.
299 #[inline]
300 pub fn global(&self) -> &Global {
301 &self.collector().global
302 }
303
304 /// Returns a reference to the `Collector` in which this `Local` resides.
305 #[inline]
306 pub fn collector(&self) -> &Collector {
307 unsafe { &**self.collector.get() }
308 }
309
310 /// Returns `true` if the current participant is pinned.
311 #[inline]
312 pub fn is_pinned(&self) -> bool {
313 self.guard_count.get() > 0
314 }
315
316 /// Adds `deferred` to the thread-local bag.
317 ///
318 /// # Safety
319 ///
320 /// It should be safe for another thread to execute the given function.
321 pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
322 let bag = &mut *self.bag.get();
323
324 while let Err(d) = bag.try_push(deferred) {
325 self.global().push_bag(bag, guard);
326 deferred = d;
327 }
328 }
329
330 pub fn flush(&self, guard: &Guard) {
331 let bag = unsafe { &mut *self.bag.get() };
332
333 if !bag.is_empty() {
334 self.global().push_bag(bag, guard);
335 }
336
337 self.global().collect(guard);
338 }
339
340 /// Pins the `Local`.
341 #[inline]
342 pub fn pin(&self) -> Guard {
343 let guard = Guard { local: self };
344
345 let guard_count = self.guard_count.get();
346 self.guard_count.set(guard_count.checked_add(1).unwrap());
347
348 if guard_count == 0 {
349 let global_epoch = self.global().epoch.load(Ordering::Relaxed);
350 let new_epoch = global_epoch.pinned();
351
352 // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
353 // The fence makes sure that any future loads from `Atomic`s will not happen before
354 // this store.
355 if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
356 // HACK(stjepang): On x86 architectures there are two different ways of executing
357 // a `SeqCst` fence.
358 //
359 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
360 // 2. `_.compare_and_swap(_, _, SeqCst)`, which compiles into a `lock cmpxchg`
361 // instruction.
362 //
363 // Both instructions have the effect of a full barrier, but benchmarks have shown
364 // that the second one makes pinning faster in this particular case. It is not
365 // clear that this is permitted by the C++ memory model (SC fences work very
366 // differently from SC accesses), but experimental evidence suggests that this
367 // works fine. Using inline assembly would be a viable (and correct) alternative,
368 // but alas, that is not possible on stable Rust.
369 let current = Epoch::starting();
370 let previous = self
371 .epoch
372 .compare_and_swap(current, new_epoch, Ordering::SeqCst);
373 debug_assert_eq!(current, previous, "participant was expected to be unpinned");
374 // We add a compiler fence to make it less likely for LLVM to do something wrong
375 // here. Formally, this is not enough to get rid of data races; practically,
376 // it should go a long way.
377 atomic::compiler_fence(Ordering::SeqCst);
378 } else {
379 self.epoch.store(new_epoch, Ordering::Relaxed);
380 atomic::fence(Ordering::SeqCst);
381 }
382
383 // Increment the pin counter.
384 let count = self.pin_count.get();
385 self.pin_count.set(count + Wrapping(1));
386
387 // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
388 // some garbage.
389 if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
390 self.global().collect(&guard);
391 }
392 }
393
394 guard
395 }
396
397 /// Unpins the `Local`.
398 #[inline]
399 pub fn unpin(&self) {
400 let guard_count = self.guard_count.get();
401 self.guard_count.set(guard_count - 1);
402
403 if guard_count == 1 {
404 self.epoch.store(Epoch::starting(), Ordering::Release);
405
406 if self.handle_count.get() == 0 {
407 self.finalize();
408 }
409 }
410 }
411
412 /// Unpins and then pins the `Local`.
413 #[inline]
414 pub fn repin(&self) {
415 let guard_count = self.guard_count.get();
416
417 // Update the local epoch only if there's only one guard.
418 if guard_count == 1 {
419 let epoch = self.epoch.load(Ordering::Relaxed);
420 let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
421
422 // Update the local epoch only if the global epoch is greater than the local epoch.
423 if epoch != global_epoch {
424 // We store the new epoch with `Release` because we need to ensure any memory
425 // accesses from the previous epoch do not leak into the new one.
426 self.epoch.store(global_epoch, Ordering::Release);
427
428 // However, we don't need a following `SeqCst` fence, because it is safe for memory
429 // accesses from the new epoch to be executed before updating the local epoch. At
430 // worse, other threads will see the new epoch late and delay GC slightly.
431 }
432 }
433 }
434
435 /// Increments the handle count.
436 #[inline]
437 pub fn acquire_handle(&self) {
438 let handle_count = self.handle_count.get();
439 debug_assert!(handle_count >= 1);
440 self.handle_count.set(handle_count + 1);
441 }
442
443 /// Decrements the handle count.
444 #[inline]
445 pub fn release_handle(&self) {
446 let guard_count = self.guard_count.get();
447 let handle_count = self.handle_count.get();
448 debug_assert!(handle_count >= 1);
449 self.handle_count.set(handle_count - 1);
450
451 if guard_count == 0 && handle_count == 1 {
452 self.finalize();
453 }
454 }
455
456 /// Removes the `Local` from the global linked list.
457 #[cold]
458 fn finalize(&self) {
459 debug_assert_eq!(self.guard_count.get(), 0);
460 debug_assert_eq!(self.handle_count.get(), 0);
461
462 // Temporarily increment handle count. This is required so that the following call to `pin`
463 // doesn't call `finalize` again.
464 self.handle_count.set(1);
465 unsafe {
466 // Pin and move the local bag into the global queue. It's important that `push_bag`
467 // doesn't defer destruction on any new garbage.
468 let guard = &self.pin();
469 self.global().push_bag(&mut *self.bag.get(), guard);
470 }
471 // Revert the handle count back to zero.
472 self.handle_count.set(0);
473
474 unsafe {
475 // Take the reference to the `Global` out of this `Local`. Since we're not protected
476 // by a guard at this time, it's crucial that the reference is read before marking the
477 // `Local` as deleted.
478 let collector: Collector = ptr::read(&*(*self.collector.get()));
479
480 // Mark this node in the linked list as deleted.
481 self.entry.delete(&unprotected());
482
483 // Finally, drop the reference to the global. Note that this might be the last reference
484 // to the `Global`. If so, the global data will be destroyed and all deferred functions
485 // in its queue will be executed.
486 drop(collector);
487 }
488 }
489}
490
491impl IsElement<Local> for Local {
492 fn entry_of(local: &Local) -> &Entry {
493 let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
494 unsafe { &*entry_ptr }
495 }
496
497 unsafe fn element_of(entry: &Entry) -> &Local {
498 // offset_of! macro uses unsafe, but it's unnecessary in this context.
499 #[allow(unused_unsafe)]
500 let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
501 &*local_ptr
502 }
503
504 unsafe fn finalize(entry: &Entry) {
505 let local = Self::element_of(entry);
506 drop(Owned::from_raw(local as *const Local as *mut Local));
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use std::sync::atomic::{AtomicUsize, Ordering};
513
514 use super::*;
515
516 #[test]
517 fn check_defer() {
518 static FLAG: AtomicUsize = AtomicUsize::new(0);
519 fn set() {
520 FLAG.store(42, Ordering::Relaxed);
521 }
522
523 let d = Deferred::new(set);
524 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
525 d.call();
526 assert_eq!(FLAG.load(Ordering::Relaxed), 42);
527 }
528
529 #[test]
530 fn check_bag() {
531 static FLAG: AtomicUsize = AtomicUsize::new(0);
532 fn incr() {
533 FLAG.fetch_add(1, Ordering::Relaxed);
534 }
535
536 let mut bag = Bag::new();
537 assert!(bag.is_empty());
538
539 for _ in 0..MAX_OBJECTS {
540 assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
541 assert!(!bag.is_empty());
542 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
543 }
544
545 let result = unsafe { bag.try_push(Deferred::new(incr)) };
546 assert!(result.is_err());
547 assert!(!bag.is_empty());
548 assert_eq!(FLAG.load(Ordering::Relaxed), 0);
549
550 drop(bag);
551 assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
552 }
553}