swmr_epoch/reader.rs
1use crate::state::{INACTIVE_EPOCH, ReaderSlot, SharedState};
2use crate::sync::{Arc, AtomicUsize, Cell, Ordering};
3
4/// A reader thread's local epoch state.
5///
6/// Each reader thread should create exactly one `LocalEpoch` via `EpochGcDomain::register_reader()`.
7/// It is `!Sync` (due to `Cell`) and must be stored per-thread.
8///
9/// The `LocalEpoch` is used to:
10/// - Pin the thread to the current epoch via `pin()`.
11/// - Obtain a `PinGuard` that protects access to `EpochPtr` values.
12///
13/// **Thread Safety**: `LocalEpoch` is not `Sync` and must be used by only one thread.
14///
15/// 读者线程的本地纪元状态。
16/// 每个读者线程应该通过 `EpochGcDomain::register_reader()` 创建恰好一个 `LocalEpoch`。
17/// 它是 `!Sync` 的(因为 `Cell`),必须在每个线程中存储。
18/// `LocalEpoch` 用于:
19/// - 通过 `pin()` 将线程钉住到当前纪元。
20/// - 获取保护对 `EpochPtr` 值的访问的 `PinGuard`。
21/// **线程安全性**:`LocalEpoch` 不是 `Sync` 的,必须仅由一个线程使用。
22pub struct LocalEpoch {
23 slot: Arc<ReaderSlot>,
24 shared: Arc<SharedState>,
25 pin_count: Cell<usize>,
26}
27
28impl LocalEpoch {
29 pub(crate) fn new(shared: Arc<SharedState>) -> Self {
30 let slot = Arc::new(ReaderSlot {
31 active_epoch: AtomicUsize::new(INACTIVE_EPOCH),
32 });
33
34 // Register the reader immediately in the shared readers list
35 shared.readers.lock().push(Arc::clone(&slot));
36
37 LocalEpoch {
38 slot,
39 shared,
40 pin_count: Cell::new(0),
41 }
42 }
43
44 /// Pin this thread to the current epoch.
45 ///
46 /// Returns a `PinGuard` that keeps the thread pinned for its lifetime.
47 ///
48 /// **Reentrancy**: This method is reentrant. Multiple calls can be nested, and the thread
49 /// remains pinned until all returned guards are dropped. You can also clone a guard to create
50 /// additional references: `let guard2 = guard1.clone();`
51 ///
52 /// **Example**:
53 /// ```ignore
54 /// let guard1 = local_epoch.pin();
55 /// let guard2 = local_epoch.pin(); // Reentrant call
56 /// let guard3 = guard1.clone(); // Clone for nested scope
57 /// // Thread remains pinned until all three guards are dropped
58 /// ```
59 ///
60 /// While pinned, the thread is considered "active" at a particular epoch,
61 /// and the garbage collector will not reclaim data from that epoch.
62 ///
63 /// 将此线程钉住到当前纪元。
64 ///
65 /// 返回一个 `PinGuard`,在其生命周期内保持线程被钉住。
66 ///
67 /// **可重入性**:此方法是可重入的。多个调用可以嵌套,线程在所有返回的守卫被 drop 之前保持被钉住。
68 /// 你也可以克隆一个守卫来创建额外的引用:`let guard2 = guard1.clone();`
69 ///
70 /// **示例**:
71 /// ```ignore
72 /// let guard1 = local_epoch.pin();
73 /// let guard2 = local_epoch.pin(); // 可重入调用
74 /// let guard3 = guard1.clone(); // 克隆用于嵌套作用域
75 /// // 线程保持被钉住直到所有三个守卫被 drop
76 /// ```
77 ///
78 /// 当被钉住时,线程被认为在特定纪元"活跃",垃圾回收器不会回收该纪元的数据。
79 #[inline]
80 pub fn pin(&self) -> PinGuard<'_> {
81 let pin_count = self.pin_count.get();
82
83 if pin_count == 0 {
84 loop {
85 let current_epoch = self.shared.global_epoch.load(Ordering::Acquire);
86 self.slot
87 .active_epoch
88 .store(current_epoch, Ordering::Release);
89
90 let min_active = self.shared.min_active_epoch.load(Ordering::Acquire);
91 if current_epoch >= min_active {
92 break;
93 }
94 std::hint::spin_loop();
95 }
96 }
97
98 self.pin_count.set(pin_count + 1);
99
100 PinGuard { reader: self }
101 }
102}
103
104/// A guard that keeps the current thread pinned to an epoch.
105///
106/// `PinGuard` is obtained by calling `LocalEpoch::pin()`.
107/// It is `!Send` and `!Sync` because it references a `!Sync` `LocalEpoch`.
108/// Its lifetime is bound to the `LocalEpoch` it came from.
109///
110/// While a `PinGuard` is held, the thread is considered "active" at a particular epoch,
111/// and the garbage collector will not reclaim data from that epoch.
112///
113/// `PinGuard` supports internal cloning via reference counting (increments the pin count),
114/// allowing nested pinning. The thread remains pinned until all cloned guards are dropped.
115///
116/// **Safety**: The `PinGuard` is the mechanism that ensures safe concurrent access to
117/// `EpochPtr` values. Readers must always hold a valid `PinGuard` when accessing
118/// shared data through `EpochPtr::load()`.
119///
120/// 一个保持当前线程被钉住到一个纪元的守卫。
121/// `PinGuard` 通过调用 `LocalEpoch::pin()` 获得。
122/// 它是 `!Send` 和 `!Sync` 的,因为它引用了一个 `!Sync` 的 `LocalEpoch`。
123/// 它的生命周期被绑定到它来自的 `LocalEpoch`。
124/// 当 `PinGuard` 被持有时,线程被认为在特定纪元"活跃",
125/// 垃圾回收器不会回收该纪元的数据。
126/// `PinGuard` 支持通过引用计数的内部克隆(增加 pin 计数),允许嵌套 pinning。
127/// 线程保持被钉住直到所有克隆的守卫被 drop。
128/// **安全性**:`PinGuard` 是确保对 `EpochPtr` 值安全并发访问的机制。
129/// 读者在通过 `EpochPtr::load()` 访问共享数据时必须始终持有有效的 `PinGuard`。
130#[must_use]
131pub struct PinGuard<'a> {
132 reader: &'a LocalEpoch,
133}
134
135impl<'a> Clone for PinGuard<'a> {
136 /// Clone this guard to create a nested pin.
137 ///
138 /// Cloning increments the pin count, and the thread remains pinned until all cloned guards
139 /// are dropped. This allows multiple scopes to hold pins simultaneously.
140 ///
141 /// 克隆此守卫以创建嵌套 pin。
142 ///
143 /// 克隆会增加 pin 计数,线程保持被钉住直到所有克隆的守卫被 drop。
144 /// 这允许多个作用域同时持有 pin。
145 #[inline]
146 fn clone(&self) -> Self {
147 let pin_count = self.reader.pin_count.get();
148
149 assert!(
150 pin_count > 0,
151 "BUG: Cloning a PinGuard in an unpinned state (pin_count = 0). \
152 This indicates incorrect API usage or a library bug."
153 );
154
155 self.reader.pin_count.set(pin_count + 1);
156
157 PinGuard {
158 reader: self.reader,
159 }
160 }
161}
162
163impl<'a> Drop for PinGuard<'a> {
164 #[inline]
165 fn drop(&mut self) {
166 let pin_count = self.reader.pin_count.get();
167
168 assert!(
169 pin_count > 0,
170 "BUG: Dropping a PinGuard in an unpinned state (pin_count = 0). \
171 This indicates incorrect API usage or a library bug."
172 );
173
174 if pin_count == 1 {
175 self.reader
176 .slot
177 .active_epoch
178 .store(INACTIVE_EPOCH, Ordering::Release);
179 }
180
181 self.reader.pin_count.set(pin_count - 1);
182 }
183}