swmr_epoch/
reader.rs

1use crate::state::{INACTIVE_EPOCH, ReaderSlot, SharedState};
2use crate::sync::{Arc, AtomicUsize, Cell, Ordering};
3
4/// A reader thread's local epoch state.
5///
6/// Each reader thread should create exactly one `LocalEpoch` via `EpochGcDomain::register_reader()`.
7/// It is `!Sync` (due to `Cell`) and must be stored per-thread.
8///
9/// The `LocalEpoch` is used to:
10/// - Pin the thread to the current epoch via `pin()`.
11/// - Obtain a `PinGuard` that protects access to `EpochPtr` values.
12///
13/// **Thread Safety**: `LocalEpoch` is not `Sync` and must be used by only one thread.
14///
15/// 读者线程的本地纪元状态。
16/// 每个读者线程应该通过 `EpochGcDomain::register_reader()` 创建恰好一个 `LocalEpoch`。
17/// 它是 `!Sync` 的(因为 `Cell`),必须在每个线程中存储。
18/// `LocalEpoch` 用于:
19/// - 通过 `pin()` 将线程钉住到当前纪元。
20/// - 获取保护对 `EpochPtr` 值的访问的 `PinGuard`。
21/// **线程安全性**:`LocalEpoch` 不是 `Sync` 的,必须仅由一个线程使用。
22pub struct LocalEpoch {
23    slot: Arc<ReaderSlot>,
24    shared: Arc<SharedState>,
25    pin_count: Cell<usize>,
26}
27
28impl LocalEpoch {
29    pub(crate) fn new(shared: Arc<SharedState>) -> Self {
30        let slot = Arc::new(ReaderSlot {
31            active_epoch: AtomicUsize::new(INACTIVE_EPOCH),
32        });
33
34        // Register the reader immediately in the shared readers list
35        shared.readers.lock().push(Arc::clone(&slot));
36
37        LocalEpoch {
38            slot,
39            shared,
40            pin_count: Cell::new(0),
41        }
42    }
43
44    /// Pin this thread to the current epoch.
45    ///
46    /// Returns a `PinGuard` that keeps the thread pinned for its lifetime.
47    ///
48    /// **Reentrancy**: This method is reentrant. Multiple calls can be nested, and the thread
49    /// remains pinned until all returned guards are dropped. You can also clone a guard to create
50    /// additional references: `let guard2 = guard1.clone();`
51    ///
52    /// **Example**:
53    /// ```ignore
54    /// let guard1 = local_epoch.pin();
55    /// let guard2 = local_epoch.pin();  // Reentrant call
56    /// let guard3 = guard1.clone();     // Clone for nested scope
57    /// // Thread remains pinned until all three guards are dropped
58    /// ```
59    ///
60    /// While pinned, the thread is considered "active" at a particular epoch,
61    /// and the garbage collector will not reclaim data from that epoch.
62    ///
63    /// 将此线程钉住到当前纪元。
64    ///
65    /// 返回一个 `PinGuard`,在其生命周期内保持线程被钉住。
66    ///
67    /// **可重入性**:此方法是可重入的。多个调用可以嵌套,线程在所有返回的守卫被 drop 之前保持被钉住。
68    /// 你也可以克隆一个守卫来创建额外的引用:`let guard2 = guard1.clone();`
69    ///
70    /// **示例**:
71    /// ```ignore
72    /// let guard1 = local_epoch.pin();
73    /// let guard2 = local_epoch.pin();  // 可重入调用
74    /// let guard3 = guard1.clone();     // 克隆用于嵌套作用域
75    /// // 线程保持被钉住直到所有三个守卫被 drop
76    /// ```
77    ///
78    /// 当被钉住时,线程被认为在特定纪元"活跃",垃圾回收器不会回收该纪元的数据。
79    #[inline]
80    pub fn pin(&self) -> PinGuard<'_> {
81        let pin_count = self.pin_count.get();
82
83        if pin_count == 0 {
84            loop {
85                let current_epoch = self.shared.global_epoch.load(Ordering::Acquire);
86                self.slot
87                    .active_epoch
88                    .store(current_epoch, Ordering::Release);
89
90                let min_active = self.shared.min_active_epoch.load(Ordering::Acquire);
91                if current_epoch >= min_active {
92                    break;
93                }
94                std::hint::spin_loop();
95            }
96        }
97
98        self.pin_count.set(pin_count + 1);
99
100        PinGuard { reader: self }
101    }
102}
103
104/// A guard that keeps the current thread pinned to an epoch.
105///
106/// `PinGuard` is obtained by calling `LocalEpoch::pin()`.
107/// It is `!Send` and `!Sync` because it references a `!Sync` `LocalEpoch`.
108/// Its lifetime is bound to the `LocalEpoch` it came from.
109///
110/// While a `PinGuard` is held, the thread is considered "active" at a particular epoch,
111/// and the garbage collector will not reclaim data from that epoch.
112///
113/// `PinGuard` supports internal cloning via reference counting (increments the pin count),
114/// allowing nested pinning. The thread remains pinned until all cloned guards are dropped.
115///
116/// **Safety**: The `PinGuard` is the mechanism that ensures safe concurrent access to
117/// `EpochPtr` values. Readers must always hold a valid `PinGuard` when accessing
118/// shared data through `EpochPtr::load()`.
119///
120/// 一个保持当前线程被钉住到一个纪元的守卫。
121/// `PinGuard` 通过调用 `LocalEpoch::pin()` 获得。
122/// 它是 `!Send` 和 `!Sync` 的,因为它引用了一个 `!Sync` 的 `LocalEpoch`。
123/// 它的生命周期被绑定到它来自的 `LocalEpoch`。
124/// 当 `PinGuard` 被持有时,线程被认为在特定纪元"活跃",
125/// 垃圾回收器不会回收该纪元的数据。
126/// `PinGuard` 支持通过引用计数的内部克隆(增加 pin 计数),允许嵌套 pinning。
127/// 线程保持被钉住直到所有克隆的守卫被 drop。
128/// **安全性**:`PinGuard` 是确保对 `EpochPtr` 值安全并发访问的机制。
129/// 读者在通过 `EpochPtr::load()` 访问共享数据时必须始终持有有效的 `PinGuard`。
130#[must_use]
131pub struct PinGuard<'a> {
132    reader: &'a LocalEpoch,
133}
134
135impl<'a> Clone for PinGuard<'a> {
136    /// Clone this guard to create a nested pin.
137    ///
138    /// Cloning increments the pin count, and the thread remains pinned until all cloned guards
139    /// are dropped. This allows multiple scopes to hold pins simultaneously.
140    ///
141    /// 克隆此守卫以创建嵌套 pin。
142    ///
143    /// 克隆会增加 pin 计数,线程保持被钉住直到所有克隆的守卫被 drop。
144    /// 这允许多个作用域同时持有 pin。
145    #[inline]
146    fn clone(&self) -> Self {
147        let pin_count = self.reader.pin_count.get();
148
149        assert!(
150            pin_count > 0,
151            "BUG: Cloning a PinGuard in an unpinned state (pin_count = 0). \
152             This indicates incorrect API usage or a library bug."
153        );
154
155        self.reader.pin_count.set(pin_count + 1);
156
157        PinGuard {
158            reader: self.reader,
159        }
160    }
161}
162
163impl<'a> Drop for PinGuard<'a> {
164    #[inline]
165    fn drop(&mut self) {
166        let pin_count = self.reader.pin_count.get();
167
168        assert!(
169            pin_count > 0,
170            "BUG: Dropping a PinGuard in an unpinned state (pin_count = 0). \
171             This indicates incorrect API usage or a library bug."
172        );
173
174        if pin_count == 1 {
175            self.reader
176                .slot
177                .active_epoch
178                .store(INACTIVE_EPOCH, Ordering::Release);
179        }
180
181        self.reader.pin_count.set(pin_count - 1);
182    }
183}