1use crate::{slot::Vec, Slot, SHARD_COUNT};
6use alloc::alloc::{alloc, dealloc, handle_alloc_error, realloc, Layout};
7use core::{
8 cell::{Cell, UnsafeCell},
9 fmt,
10 marker::PhantomData,
11 mem::{self, ManuallyDrop},
12 panic::RefUnwindSafe,
13 ptr, slice,
14 sync::atomic::{
15 self, AtomicPtr, AtomicUsize,
16 Ordering::{Acquire, Relaxed, Release, SeqCst},
17 },
18};
19use std::{num::NonZeroUsize, thread};
20use thread_local::ThreadLocal;
21
22#[allow(clippy::useless_transmute)]
25const INACTIVE: *mut Node = unsafe { mem::transmute(usize::MAX) };
26
27const MIN_RETIRED_LEN: usize = 64;
28
29pub struct CollectorHandle {
30 ptr: *mut Collector,
31}
32
33unsafe impl Send for CollectorHandle {}
35
36unsafe impl Sync for CollectorHandle {}
38
39impl Default for CollectorHandle {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl CollectorHandle {
46 #[must_use]
47 pub fn new() -> Self {
48 if SHARD_COUNT.load(Relaxed) == 0 {
49 let num_cpus = thread::available_parallelism()
50 .map(NonZeroUsize::get)
51 .unwrap_or(1);
52 SHARD_COUNT.store(num_cpus.next_power_of_two(), Relaxed);
53 }
54
55 let ptr = Box::into_raw(Box::new(Collector {
56 retirement_lists: ThreadLocal::new(),
57 handle_count: AtomicUsize::new(1),
58 }));
59
60 unsafe { CollectorHandle { ptr } }
63 }
64
65 #[inline]
72 #[must_use]
73 pub unsafe fn pin(&self) -> Guard<'_> {
74 let mut is_fresh_entry = false;
75
76 let retirement_list = self.collector().retirement_lists.get_or(|| {
77 is_fresh_entry = true;
78
79 crate::set_shard_index();
80
81 RetirementList {
82 head: AtomicPtr::new(INACTIVE),
83 collector: ManuallyDrop::new(unsafe { ptr::read(self) }),
86 guard_count: Cell::new(0),
87 batch: UnsafeCell::new(LocalBatch::new()),
88 }
89 });
90
91 if is_fresh_entry {
92 atomic::fence(SeqCst);
93 }
94
95 retirement_list.pin()
96 }
97
98 #[inline]
99 fn collector(&self) -> &Collector {
100 unsafe { &*self.ptr }
102 }
103}
104
105impl Clone for CollectorHandle {
106 #[inline]
107 fn clone(&self) -> Self {
108 #[allow(clippy::cast_sign_loss)]
109 if self.collector().handle_count.fetch_add(1, Relaxed) > isize::MAX as usize {
110 std::process::abort();
111 }
112
113 unsafe { CollectorHandle { ptr: self.ptr } }
116 }
117}
118
119impl fmt::Debug for CollectorHandle {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("CollectorHandle").finish_non_exhaustive()
122 }
123}
124
125impl PartialEq for CollectorHandle {
126 #[inline]
127 fn eq(&self, other: &Self) -> bool {
128 self.ptr == other.ptr
129 }
130}
131
132impl Eq for CollectorHandle {}
133
134impl Drop for CollectorHandle {
135 #[inline]
136 fn drop(&mut self) {
137 if self.collector().handle_count.fetch_sub(1, Release) == 1 {
138 atomic::fence(Acquire);
139
140 let _ = unsafe { Box::from_raw(self.ptr) };
145 }
146 }
147}
148
149struct Collector {
150 retirement_lists: ThreadLocal<RetirementList>,
152 handle_count: AtomicUsize,
154}
155
156#[repr(align(128))]
158pub(crate) struct RetirementList {
159 head: AtomicPtr<Node>,
161 collector: ManuallyDrop<CollectorHandle>,
163 guard_count: Cell<usize>,
165 batch: UnsafeCell<LocalBatch>,
167}
168
169unsafe impl Sync for RetirementList {}
171
172impl RefUnwindSafe for RetirementList {}
175
176impl RetirementList {
177 #[inline]
178 pub(crate) fn pin(&self) -> Guard<'_> {
179 let guard_count = self.guard_count.get();
180 self.guard_count.set(guard_count.checked_add(1).unwrap());
181
182 if guard_count == 0 {
183 unsafe { self.enter() };
185 }
186
187 unsafe { Guard::new(self) }
192 }
193
194 #[inline]
195 unsafe fn enter(&self) {
196 self.head.store(ptr::null_mut(), Relaxed);
197 atomic::fence(SeqCst);
198 }
199
200 #[inline]
201 unsafe fn defer_reclaim(
202 &self,
203 index: u32,
204 slots: *const u8,
205 reclaim: unsafe fn(u32, *const u8),
206 ) {
207 let batch = unsafe { &mut *self.batch.get() };
209
210 unsafe { batch.push(index, slots, reclaim) };
214
215 if batch.len() == MIN_RETIRED_LEN {
216 unsafe { self.retire() };
218 }
219 }
220
221 #[inline(never)]
222 unsafe fn retire(&self) {
223 let batch = unsafe { &mut *self.batch.get() };
225
226 if batch.is_empty() {
227 return;
228 }
229
230 let mut batch = mem::take(batch);
231 let retired_len = batch.len();
232 let mut len = 0;
233
234 unsafe { batch.set_retired_len(retired_len) };
236
237 atomic::fence(SeqCst);
238
239 for retirement_list in &self.collector.collector().retirement_lists {
240 if retirement_list.head.load(Relaxed) == INACTIVE {
241 continue;
242 }
243
244 if len >= retired_len {
245 unsafe { batch.push(0, ptr::null(), |_, _| {}) };
247 }
248
249 let node = unsafe { batch.as_mut_slice().get_unchecked_mut(len) };
251
252 node.link.retirement_list = &retirement_list.head;
253
254 len += 1;
255 }
256
257 let nodes = batch.as_mut_ptr();
258 let batch = batch.into_raw();
259
260 atomic::fence(Acquire);
261
262 #[allow(clippy::mut_range_bound)]
263 'outer: for node_index in 0..len {
264 let node = unsafe { nodes.add(node_index) };
266
267 unsafe { (*node).batch = batch };
269
270 let list = unsafe { &*(*node).link.retirement_list };
276
277 let mut head = list.load(Relaxed);
278
279 loop {
280 if head == INACTIVE {
281 atomic::fence(Acquire);
282 len -= 1;
283 continue 'outer;
284 }
285
286 unsafe { (*node).link.next = head };
288
289 match list.compare_exchange_weak(head, node, Release, Relaxed) {
290 Ok(_) => break,
291 Err(new_head) => head = new_head,
292 }
293 }
294 }
295
296 if unsafe { (*batch).ref_count.fetch_add(len, Release) }.wrapping_add(len) == 0 {
298 unsafe { self.reclaim(batch) };
300 }
301 }
302
303 #[inline]
304 unsafe fn leave(&self) {
305 let head = self.head.swap(INACTIVE, Release);
306
307 if !head.is_null() {
308 unsafe { self.traverse(head) };
311 }
312 }
313
314 #[cold]
315 unsafe fn traverse(&self, mut head: *mut Node) {
316 atomic::fence(Acquire);
317
318 while !head.is_null() {
319 let batch = unsafe { (*head).batch };
321
322 let next = unsafe { (*head).link.next };
325
326 let ref_count = unsafe { (*batch).ref_count.fetch_sub(1, Release) }.wrapping_sub(1);
329
330 if ref_count == 0 {
331 unsafe { self.reclaim(batch) };
333 }
334
335 head = next;
336 }
337 }
338
339 unsafe fn reclaim(&self, batch: *mut Batch) {
340 atomic::fence(Acquire);
341
342 let mut batch = unsafe { LocalBatch::from_raw(batch) };
344
345 for node in batch.retired_as_mut_slice() {
346 unsafe { (node.reclaim)(node.index, node.slots) };
351 }
352 }
353}
354
355impl Drop for Collector {
356 fn drop(&mut self) {
357 atomic::fence(Acquire);
358
359 for retirement_list in &mut self.retirement_lists {
360 let batch = retirement_list.batch.get_mut();
361
362 if batch.is_empty() {
363 continue;
364 }
365
366 for node in batch.retired_as_mut_slice() {
367 unsafe { (node.reclaim)(node.index, node.slots) };
373 }
374 }
375 }
376}
377
378#[repr(C)]
380struct Batch {
381 ref_count: AtomicUsize,
383 capacity: usize,
385 len: usize,
387 retired_len: usize,
389 nodes: [Node; 0],
391}
392
393struct Node {
395 link: NodeLink,
396 batch: *mut Batch,
398 index: u32,
400 slots: *const u8,
402 reclaim: unsafe fn(u32, *const u8),
404}
405
406union NodeLink {
407 retirement_list: *const AtomicPtr<Node>,
409 next: *mut Node,
411}
412
413struct LocalBatch {
414 ptr: *mut Batch,
415}
416
417unsafe impl Send for LocalBatch {}
419
420unsafe impl Sync for LocalBatch {}
422
423impl Default for LocalBatch {
424 fn default() -> Self {
425 LocalBatch::new()
426 }
427}
428
429impl LocalBatch {
430 const MIN_CAP: usize = 4;
431
432 fn new() -> Self {
433 let layout = layout_for_capacity(Self::MIN_CAP);
434
435 let ptr = unsafe { alloc(layout) }.cast::<Batch>();
437
438 if ptr.is_null() {
439 handle_alloc_error(layout);
440 }
441
442 unsafe {
444 *ptr::addr_of_mut!((*ptr).ref_count) = AtomicUsize::new(0);
445 *ptr::addr_of_mut!((*ptr).capacity) = Self::MIN_CAP;
446 *ptr::addr_of_mut!((*ptr).len) = 0;
447 *ptr::addr_of_mut!((*ptr).retired_len) = 0;
448 }
449
450 LocalBatch { ptr }
451 }
452
453 #[inline]
454 unsafe fn from_raw(ptr: *mut Batch) -> Self {
455 LocalBatch { ptr }
456 }
457
458 #[inline]
459 fn into_raw(self) -> *mut Batch {
460 ManuallyDrop::new(self).ptr
461 }
462
463 #[inline]
464 fn capacity(&self) -> usize {
465 unsafe { (*self.ptr).capacity }
467 }
468
469 #[inline]
470 fn len(&self) -> usize {
471 unsafe { (*self.ptr).len }
473 }
474
475 #[inline]
476 fn retired_len(&self) -> usize {
477 unsafe { (*self.ptr).retired_len }
479 }
480
481 #[inline]
482 fn is_empty(&self) -> bool {
483 self.len() == 0
484 }
485
486 #[inline]
487 fn as_mut_ptr(&mut self) -> *mut Node {
488 unsafe { ptr::addr_of_mut!((*self.ptr).nodes) }.cast()
490 }
491
492 fn as_mut_slice(&mut self) -> &mut [Node] {
493 unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.len()) }
496 }
497
498 #[inline]
499 fn retired_as_mut_slice(&mut self) -> &mut [Node] {
500 unsafe { slice::from_raw_parts_mut(self.as_mut_ptr(), self.retired_len()) }
503 }
504
505 #[inline]
506 unsafe fn push(&mut self, index: u32, slots: *const u8, reclaim: unsafe fn(u32, *const u8)) {
507 let len = self.len();
508
509 if len == self.capacity() {
510 self.grow_one();
511 }
512
513 let node = Node {
514 link: NodeLink {
515 retirement_list: ptr::null(),
516 },
517 batch: ptr::null_mut(),
518 index,
519 slots,
520 reclaim,
521 };
522
523 unsafe { self.as_mut_ptr().add(len).write(node) };
525
526 unsafe { self.set_len(len + 1) };
528 }
529
530 #[inline(never)]
531 fn grow_one(&mut self) {
532 let capacity = self.capacity();
533 let new_capacity = capacity * 2;
534 let layout = layout_for_capacity(capacity);
535 let new_layout = layout_for_capacity(new_capacity);
536
537 let new_ptr = unsafe { realloc(self.ptr.cast(), layout, new_layout.size()) };
543
544 if new_ptr.is_null() {
545 handle_alloc_error(new_layout);
546 }
547
548 self.ptr = new_ptr.cast();
549
550 unsafe { (*self.ptr).capacity = new_capacity };
552 }
553
554 #[inline]
555 unsafe fn set_len(&mut self, len: usize) {
556 unsafe { (*self.ptr).len = len };
558 }
559
560 #[inline]
561 unsafe fn set_retired_len(&mut self, len: usize) {
562 unsafe { (*self.ptr).retired_len = len };
564 }
565}
566
567impl Drop for LocalBatch {
568 fn drop(&mut self) {
569 let layout = layout_for_capacity(self.capacity());
570
571 unsafe { dealloc(self.ptr.cast(), layout) };
575 }
576}
577
578fn layout_for_capacity(capacity: usize) -> Layout {
579 Layout::new::<Batch>()
580 .extend(Layout::array::<Node>(capacity).unwrap())
581 .unwrap()
582 .0
583}
584
585pub struct Guard<'a> {
586 retirement_list: &'a RetirementList,
587 marker: PhantomData<*const ()>,
588}
589
590impl<'a> Guard<'a> {
591 #[inline]
592 unsafe fn new(retirement_list: &'a RetirementList) -> Self {
593 Guard {
594 retirement_list,
595 marker: PhantomData,
596 }
597 }
598
599 #[inline]
600 #[must_use]
601 pub fn collector(&self) -> &CollectorHandle {
602 &self.retirement_list.collector
603 }
604
605 #[inline]
606 pub(crate) unsafe fn defer_reclaim<V>(&self, index: u32, slots: &Vec<V>) {
607 let slots = slots.as_ptr().cast();
608 let reclaim = transmute_reclaim_fp(crate::reclaim::<V>);
609
610 unsafe { self.retirement_list.defer_reclaim(index, slots, reclaim) };
616 }
617
618 #[inline]
619 pub(crate) unsafe fn defer_reclaim_invalidated<V>(&self, index: u32, slots: &Vec<V>) {
620 let slots = slots.as_ptr().cast();
621 let reclaim = transmute_reclaim_fp(crate::reclaim_invalidated::<V>);
622
623 unsafe { self.retirement_list.defer_reclaim(index, slots, reclaim) }
629 }
630
631 #[inline]
632 pub fn flush(&self) {
633 unsafe { self.retirement_list.retire() };
635 }
636}
637
638fn transmute_reclaim_fp<V>(fp: unsafe fn(u32, *const Slot<V>)) -> unsafe fn(u32, *const u8) {
639 unsafe { mem::transmute::<unsafe fn(u32, *const Slot<V>), unsafe fn(u32, *const u8)>(fp) }
641}
642
643impl fmt::Debug for Guard<'_> {
644 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
645 f.debug_struct("Guard").finish_non_exhaustive()
646 }
647}
648
649impl Clone for Guard<'_> {
650 #[inline]
651 fn clone(&self) -> Self {
652 let guard_count = self.retirement_list.guard_count.get();
653 self.retirement_list
654 .guard_count
655 .set(guard_count.checked_add(1).unwrap());
656
657 unsafe { Guard::new(self.retirement_list) }
662 }
663}
664
665impl Drop for Guard<'_> {
666 #[inline]
667 fn drop(&mut self) {
668 let guard_count = self.retirement_list.guard_count.get();
669 self.retirement_list.guard_count.set(guard_count - 1);
670
671 if guard_count == 1 {
672 unsafe { self.retirement_list.leave() };
677 }
678 }
679}