swmr_cell/lib.rs
1//! # SWMR Version-Based Single Object
2//!
3//! This crate provides a single-writer, multi-reader (SWMR) cell that supports
4//! concurrent wait-free reads and lock-free writes using version-based garbage collection.
5//!
6//! ## Core Concepts
7//!
8//! - **Single Object**: The `swmr_cell` library manages a single versioned object per `SwmrCell`.
9//! - **Version**: The version counter represents the state of the object. Each write increments the version.
10//! - **Pinning**: Readers pin the current version when they start reading, preventing the writer from reclaiming that version (and any older versions still visible to other readers) until they are done.
11//!
12//! ## Typical Usage
13//!
14//! ```rust
15//! use swmr_cell::SwmrCell;
16//!
17//! // 1. Create a new SWMR cell with an initial value
18//! let mut cell = SwmrCell::new(42i32);
19//!
20//! // 2. Create a local reader for this thread (or pass to another thread)
21//! let local = cell.local_reader();
22//!
23//! // 3. Pin and read the value by dereferencing the guard
24//! let guard = local.pin();
25//! assert_eq!(*guard, 42);
26//! drop(guard);
27//!
28//! // 4. Writer updates the value
29//! cell.store(100i32);
30//!
31//! // 5. Read the new value
32//! let guard = local.pin();
33//! assert_eq!(*guard, 100);
34//! drop(guard);
35//!
36//! // 6. Manually collect garbage (optional, happens automatically too)
37//! cell.collect();
38//! ```
39#![cfg_attr(not(feature = "std"), no_std)]
40
41#[cfg(not(feature = "std"))]
42extern crate alloc;
43
44#[cfg(all(not(feature = "std"), test))]
45extern crate std;
46
47mod shim;
48
49#[cfg(test)]
50mod tests;
51use crate::shim::{
52 Arc, AtomicPtr, AtomicUsize, Box, Cell, Mutex, Ordering, Vec, VecDeque, heavy_barrier,
53 light_barrier,
54};
55use core::{fmt, marker::PhantomData, ops::Deref};
56
57/// Default threshold for automatic garbage reclamation (count of retired nodes).
58/// 自动垃圾回收的默认阈值(已退休节点的数量)。
59pub(crate) const AUTO_RECLAIM_THRESHOLD: usize = 16;
60
61/// Represents a reader that is not currently pinned to any version.
62/// 表示当前未被钉住到任何版本的读者。
63pub(crate) const INACTIVE_VERSION: usize = usize::MAX;
64
65/// A single-writer, multi-reader cell with version-based garbage collection.
66///
67/// `SwmrCell` provides safe concurrent access where one writer can update the value
68/// and multiple readers can read it concurrently. Readers access the value by
69/// creating a `LocalReader` and pinning it.
70///
71/// 单写多读单元,带有基于版本的垃圾回收。
72///
73/// `SwmrCell` 提供安全的并发访问,其中一个写入者可以更新值,
74/// 多个读者可以并发读取它。读者通过创建 `LocalReader` 并 pin 来访问值。
75pub struct SwmrCell<T: 'static, const RP: bool = false> {
76 shared: Arc<SharedState<T, RP>>,
77 garbage: GarbageSet<T>,
78 auto_reclaim_threshold: Option<usize>,
79}
80
81impl<T: 'static> SwmrCell<T, false> {
82 /// Create a new SWMR cell with default settings and the given initial value.
83 ///
84 /// 使用默认设置和给定的初始值创建一个新的 SWMR 单元。
85 #[inline]
86 pub fn new(data: T) -> Self {
87 Self::builder().build(data)
88 }
89
90 /// Returns a builder for configuring the SWMR cell.
91 ///
92 /// 返回用于配置 SWMR 单元的构建器。
93 #[inline]
94 pub fn builder() -> SwmrCellBuilder<T, false> {
95 SwmrCellBuilder {
96 auto_reclaim_threshold: Some(AUTO_RECLAIM_THRESHOLD),
97 marker: PhantomData,
98 }
99 }
100}
101
102impl<T: 'static, const RP: bool> SwmrCell<T, RP> {
103 /// Create a new `LocalReader` for reading.
104 ///
105 /// Each thread should create its own `LocalReader` and reuse it.
106 /// `LocalReader` is `!Sync` and should not be shared between threads.
107 ///
108 /// 创建一个新的 `LocalReader` 用于读取。
109 /// 每个线程应该创建自己的 `LocalReader` 并重复使用。
110 /// `LocalReader` 是 `!Sync` 的,不应在线程之间共享。
111 #[inline]
112 pub fn local_reader(&self) -> LocalReader<T, RP> {
113 LocalReader::new(self.shared.clone())
114 }
115
116 /// Create a new `SwmrReaderFactory` that can be shared across threads.
117 ///
118 /// `SwmrReaderFactory` is `Sync` + `Clone` and acts as a factory for `LocalReader`s.
119 /// This is useful for distributing reader creation capability to other threads.
120 ///
121 /// 创建一个新的 `SwmrReaderFactory`,可以在线程之间共享。
122 /// `SwmrReaderFactory` 是 `Sync` + `Clone` 的,充当 `LocalReader` 的工厂。
123 /// 这对于将读者创建能力分发给其他线程很有用。
124 #[inline]
125 pub fn reader_factory(&self) -> SwmrReaderFactory<T, RP> {
126 SwmrReaderFactory {
127 shared: self.shared.clone(),
128 }
129 }
130
131 /// Store a new value, making it visible to readers.
132 /// The old value is retired and will be garbage collected.
133 ///
134 /// This operation increments the global version.
135 ///
136 /// 存储新值,使其对读者可见。
137 /// 旧值已退休,将被垃圾回收。
138 /// 此操作会增加全局版本。
139 pub fn store(&mut self, data: T) {
140 let new_ptr = Box::into_raw(Box::new(data));
141 let old_ptr = self.shared.ptr.swap(new_ptr, Ordering::Release);
142
143 // Increment global version.
144 // The old value belongs to the previous version (the one before this increment).
145 // 增加全局版本。
146 // 旧值属于前一个版本(此次增加之前的那个)。
147 let old_version = self.shared.global_version.fetch_add(1, Ordering::AcqRel);
148
149 if !old_ptr.is_null() {
150 // Safe because we just swapped it out and we own the writer
151 unsafe {
152 self.garbage.add(Box::from_raw(old_ptr), old_version);
153 }
154 }
155
156 // Auto-reclaim
157 if let Some(threshold) = self.auto_reclaim_threshold
158 && self.garbage.len() > threshold
159 {
160 self.collect();
161 }
162 }
163
164 /// Get a reference to the previously stored value, if any.
165 ///
166 /// Returns `None` if no previous value exists (i.e., only the initial value has been stored).
167 ///
168 /// **Note**: The previous value is guaranteed not to be garbage collected because
169 /// `collect()` uses `safety_limit = current_version - 2`, which always preserves
170 /// the most recently retired value (version = current_version - 1).
171 ///
172 /// This is useful for comparing the current value with the previous one,
173 /// or for implementing undo/rollback logic.
174 ///
175 /// 获取上一个存储值的引用(如果存在)。
176 ///
177 /// 如果不存在上一个值(即只存储了初始值),则返回 `None`。
178 ///
179 /// **注意**:上一个值保证不会被垃圾回收,因为 `collect()` 使用 `safety_limit = current_version - 2`,
180 /// 这始终保留最近退休的值(版本 = current_version - 1)。
181 ///
182 /// 这对于将当前值与上一个值进行比较,或实现撤销/回滚逻辑很有用。
183 ///
184 /// # Example
185 ///
186 /// ```rust
187 /// use swmr_cell::SwmrCell;
188 ///
189 /// let mut cell = SwmrCell::new(1);
190 /// assert!(cell.previous().is_none()); // No previous value yet
191 ///
192 /// cell.store(2);
193 /// assert_eq!(cell.previous(), Some(&1)); // Previous value is 1
194 ///
195 /// cell.store(3);
196 /// assert_eq!(cell.previous(), Some(&2)); // Previous value is 2
197 /// ```
198 #[inline]
199 pub fn previous(&self) -> Option<&T> {
200 self.garbage.back()
201 }
202
203 /// Get a reference to the current value (writer-only, no pinning required).
204 ///
205 /// This is only accessible from the writer thread since `SwmrCell` is `!Sync`.
206 ///
207 /// 获取当前值的引用(仅写者可用,无需 pin)。
208 /// 这只能从写者线程访问,因为 `SwmrCell` 是 `!Sync` 的。
209 #[inline]
210 pub fn get(&self) -> &T {
211 // Safety: We own the writer, and the current pointer is always valid.
212 // 安全性:我们拥有写者,当前指针始终有效。
213 unsafe { &*self.shared.ptr.load(Ordering::Acquire) }
214 }
215
216 /// Update the value using a closure.
217 ///
218 /// The closure receives the current value and should return the new value.
219 /// This is equivalent to `cell.store(f(cell.get().clone()))` but more ergonomic.
220 ///
221 /// 使用闭包更新值。
222 /// 闭包接收当前值并应返回新值。
223 /// 这相当于 `cell.store(f(cell.get().clone()))` 但更符合人体工程学。
224 #[inline]
225 pub fn update<F>(&mut self, f: F)
226 where
227 F: FnOnce(&T) -> T,
228 {
229 let new_value = f(self.get());
230 self.store(new_value);
231 }
232
233 /// Get the current global version.
234 ///
235 /// The version is incremented each time `store()` or `replace()` is called.
236 ///
237 /// 获取当前全局版本。
238 /// 每次调用 `store()` 或 `replace()` 时版本会增加。
239 #[inline]
240 pub fn version(&self) -> usize {
241 self.shared.global_version.load(Ordering::Acquire)
242 }
243
244 /// Get the number of retired objects waiting for garbage collection.
245 ///
246 /// 获取等待垃圾回收的已退休对象数量。
247 #[inline]
248 pub fn garbage_count(&self) -> usize {
249 self.garbage.len()
250 }
251
252 /// Manually trigger garbage collection.
253 /// 手动触发垃圾回收。
254 pub fn collect(&mut self) {
255 // In this design, we don't necessarily advance the version just for collection.
256 // But we need to find min_active_version.
257
258 let current_version = self.shared.global_version.load(Ordering::Acquire);
259
260 // Safety limit ensures we never reclaim the most recent retired value (previous).
261 // The most recent retired value has version = current_version - 1.
262 // With safety_limit = current_version - 2, we only reclaim versions < current_version - 2,
263 // so the previous value (version = current_version - 1) is always preserved.
264 // 安全限制确保我们永远不会回收最近退休的值(previous)。
265 // 最近退休的值的版本 = current_version - 1。
266 // 使用 safety_limit = current_version - 2,我们只回收版本 < current_version - 2 的,
267 // 因此上一个值(版本 = current_version - 1)始终被保留。
268 let safety_limit = current_version.saturating_sub(2);
269
270 let mut min_active = current_version;
271
272 // Force memory visibility of any preceding stores and serialize reader streams.
273 // This ensures we see any active readers that have completed their light_barrier.
274 heavy_barrier::<RP>();
275
276 let mut shared_readers = self.shared.readers.lock();
277
278 for arc_slot in shared_readers.iter() {
279 let version = arc_slot.active_version.load(Ordering::Acquire);
280 if version != INACTIVE_VERSION {
281 min_active = min_active.min(version);
282 }
283 }
284
285 // Clean up dead reader slots (strong_count == 1 means only SharedState holds it)
286 // 清理死读者槽(strong_count == 1 表示只有 SharedState 持有它)
287 shared_readers.retain(|arc_slot| Arc::strong_count(arc_slot) > 1);
288
289 drop(shared_readers);
290
291 let reclaim_threshold = min_active.min(safety_limit);
292
293 self.shared
294 .min_active_version
295 .store(reclaim_threshold, Ordering::Release);
296
297 self.garbage.collect(reclaim_threshold, current_version);
298 }
299}
300
301/// A handle for creating `LocalReader`s that can be shared across threads.
302///
303/// Unlike `LocalReader`, which is `!Sync` and bound to a single thread,
304/// `SwmrReaderFactory` is `Sync` and `Clone`. It holds a reference to the shared state
305/// but does not register a reader slot until `local_reader()` is called.
306///
307/// 可以跨线程共享的用于创建 `LocalReader` 的句柄。
308///
309/// 与 `!Sync` 且绑定到单个线程的 `LocalReader` 不同,
310/// `SwmrReaderFactory` 是 `Sync` 和 `Clone` 的。它持有对共享状态的引用,
311/// 但直到调用 `local_reader()` 时才注册读者槽。
312pub struct SwmrReaderFactory<T: 'static, const RP: bool = false> {
313 shared: Arc<SharedState<T, RP>>,
314}
315
316impl<T: 'static, const RP: bool> SwmrReaderFactory<T, RP> {
317 /// Create a new `LocalReader` for the current thread.
318 ///
319 /// 为当前线程创建一个新的 `LocalReader`。
320 #[inline]
321 pub fn local_reader(&self) -> LocalReader<T, RP> {
322 LocalReader::new(self.shared.clone())
323 }
324}
325
326impl<T: 'static, const RP: bool> Clone for SwmrReaderFactory<T, RP> {
327 #[inline]
328 fn clone(&self) -> Self {
329 Self {
330 shared: self.shared.clone(),
331 }
332 }
333}
334
335impl<T: 'static, const RP: bool> fmt::Debug for SwmrReaderFactory<T, RP> {
336 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
337 f.debug_struct("SwmrReaderFactory")
338 .field("read_preferred", &RP)
339 .finish()
340 }
341}
342
343/// A builder for configuring and creating a SWMR cell.
344///
345/// 用于配置和创建 SWMR 单元的构建器。
346pub struct SwmrCellBuilder<T, const RP: bool = false> {
347 auto_reclaim_threshold: Option<usize>,
348 marker: PhantomData<T>,
349}
350
351impl<T: 'static, const RP: bool> SwmrCellBuilder<T, RP> {
352 /// Sets the threshold for automatic garbage reclamation.
353 ///
354 /// When the number of retired objects exceeds this threshold,
355 /// garbage collection is triggered automatically during `store`.
356 ///
357 /// Set to `None` to disable automatic reclamation.
358 /// Default is `Some(64)`.
359 ///
360 /// 设置自动垃圾回收的阈值。
361 /// 当已退休对象的数量超过此阈值时,将在 `store` 期间自动触发垃圾回收。
362 /// 设置为 `None` 以禁用自动回收。
363 /// 默认为 `Some(64)`。
364 #[inline]
365 pub fn auto_reclaim_threshold(mut self, threshold: Option<usize>) -> Self {
366 self.auto_reclaim_threshold = threshold;
367 self
368 }
369
370 /// Creates a new SWMR cell with the configured settings and initial value.
371 ///
372 /// 使用配置的设置和初始值创建一个新的 SWMR 单元。
373 pub fn build(self, data: T) -> SwmrCell<T, RP> {
374 let shared = Arc::new(SharedState {
375 global_version: AtomicUsize::new(0),
376 min_active_version: AtomicUsize::new(0),
377 ptr: AtomicPtr::new(Box::into_raw(Box::new(data))),
378 readers: Mutex::new(Vec::new()),
379 });
380
381 SwmrCell {
382 shared,
383 garbage: GarbageSet::new(),
384 auto_reclaim_threshold: self.auto_reclaim_threshold,
385 }
386 }
387}
388
389impl<T: 'static> SwmrCellBuilder<T, false> {
390 /// Enable read-preferred mode.
391 ///
392 /// 启用读优先模式。
393 #[inline]
394 pub fn read_preferred(self) -> SwmrCellBuilder<T, true> {
395 SwmrCellBuilder {
396 auto_reclaim_threshold: self.auto_reclaim_threshold,
397 marker: PhantomData,
398 }
399 }
400}
401
402/// Manages retired objects and their reclamation.
403///
404/// This struct encapsulates the logic for:
405/// - Storing retired objects in version-ordered queue.
406/// - Reclaiming objects when they are safe to delete.
407///
408/// 管理已退休对象及其回收。
409///
410/// 此结构体封装了以下逻辑:
411/// - 将已退休对象存储在按版本排序的队列中。
412/// - 当对象可以安全删除时进行回收。
413struct GarbageSet<T> {
414 /// Queue of garbage items, ordered by version.
415 /// Each element is (version, node).
416 queue: VecDeque<(usize, Box<T>)>,
417}
418
419impl<T> GarbageSet<T> {
420 /// Create a new empty garbage set.
421 /// 创建一个新的空垃圾集合。
422 fn new() -> Self {
423 Self {
424 queue: VecDeque::new(),
425 }
426 }
427
428 /// Get the total number of retired objects.
429 /// 获取已退休对象的总数。
430 #[inline]
431 fn len(&self) -> usize {
432 self.queue.len()
433 }
434
435 /// Get a reference to the most recently retired object (the previous value).
436 /// 获取最近退休对象(上一个值)的引用。
437 #[inline]
438 fn back(&self) -> Option<&T> {
439 self.queue.back().map(|(_, boxed)| boxed.as_ref())
440 }
441
442 /// Add a retired node to the set for the current version.
443 ///
444 /// 将已退休节点添加到当前版本的集合中。
445 #[inline]
446 fn add(&mut self, node: Box<T>, current_version: usize) {
447 self.queue.push_back((current_version, node));
448 }
449
450 /// Reclaim garbage that is safe to delete.
451 ///
452 /// Garbage from versions older than `min_active_version` is dropped.
453 ///
454 /// 回收可以安全删除的垃圾。
455 ///
456 /// 来自比 `min_active_version` 更旧的版本的垃圾将被 drop。
457 #[inline]
458 fn collect(&mut self, min_active_version: usize, _current_version: usize) {
459 // We reclaim everything that is strictly older than min_active_version.
460 // If min_active_version == current_version, then everything (all < current_version) is reclaimed.
461 while let Some((version, _)) = self.queue.front() {
462 if *version >= min_active_version {
463 break;
464 }
465 self.queue.pop_front(); // Box<T> is dropped here
466 }
467 }
468}
469
470/// A slot allocated for a reader thread to record its active version.
471///
472/// Cache-aligned to prevent false sharing between readers.
473///
474/// 为读者线程分配的槽,用于记录其活跃版本。
475/// 缓存对齐以防止读者之间的伪共享。
476#[derive(Debug)]
477#[repr(align(64))]
478pub(crate) struct ReaderSlot {
479 /// The version currently being accessed by the reader, or INACTIVE_VERSION.
480 /// 读者当前访问的版本,或 INACTIVE_VERSION。
481 pub(crate) active_version: AtomicUsize,
482}
483
484/// Global shared state for the version GC domain.
485///
486/// Contains the global version, the minimum active version, the data pointer, and the list of reader slots.
487///
488/// version GC 域的全局共享状态。
489/// 包含全局版本、最小活跃版本、数据指针和读者槽列表。
490#[repr(align(64))]
491pub(crate) struct SharedState<T: 'static, const RP: bool = false> {
492 /// The global monotonic version counter.
493 /// 全局单调版本计数器。
494 pub(crate) global_version: AtomicUsize,
495 /// The minimum version among all active readers (cached for performance).
496 /// 所有活跃读者中的最小版本(为性能而缓存)。
497 pub(crate) min_active_version: AtomicUsize,
498 /// The current data pointer.
499 /// 当前数据指针。
500 pub(crate) ptr: AtomicPtr<T>,
501 /// List of all registered reader slots. Protected by a Mutex.
502 /// 所有注册读者槽的列表。由 Mutex 保护。
503 pub(crate) readers: Mutex<Vec<Arc<ReaderSlot>>>,
504}
505
506impl<T: 'static, const RP: bool> Drop for SharedState<T, RP> {
507 fn drop(&mut self) {
508 // Drop the current value held by ptr to avoid leaking it.
509 // Drop ptr 持有的当前值,以避免泄漏。
510 let ptr = self.ptr.load(Ordering::Acquire);
511 if !ptr.is_null() {
512 unsafe {
513 drop(Box::from_raw(ptr));
514 }
515 }
516 }
517}
518
519/// A reader thread's local version state.
520///
521/// Each reader thread should create exactly one `LocalReader` via `SwmrCell::local_reader()`.
522/// It is `!Sync` (due to `Cell`) and must be stored per-thread.
523///
524/// The `LocalReader` is used to:
525/// - Pin the thread to the current version via `pin()`.
526/// - Obtain a `PinGuard` that protects access to values and can be dereferenced.
527///
528/// **Thread Safety**: `LocalReader` is not `Sync` and must be used by only one thread.
529///
530/// 读者线程的本地版本状态。
531/// 每个读者线程应该通过 `SwmrCell::local_reader()` 创建恰好一个 `LocalReader`。
532/// 它是 `!Sync` 的(因为 `Cell`),必须在每个线程中存储。
533/// `LocalReader` 用于:
534/// - 通过 `pin()` 将线程钉住到当前版本。
535/// - 获取保护对值访问的 `PinGuard`,可以解引用来读取值。
536/// **线程安全性**:`LocalReader` 不是 `Sync` 的,必须仅由一个线程使用。
537pub struct LocalReader<T: 'static, const RP: bool = false> {
538 slot: Arc<ReaderSlot>,
539 shared: Arc<SharedState<T, RP>>,
540 pin_count: Cell<usize>,
541}
542
543impl<T: 'static, const RP: bool> LocalReader<T, RP> {
544 fn new(shared: Arc<SharedState<T, RP>>) -> Self {
545 let slot = Arc::new(ReaderSlot {
546 active_version: AtomicUsize::new(INACTIVE_VERSION),
547 });
548
549 // Register the reader immediately in the shared readers list
550 shared.readers.lock().push(Arc::clone(&slot));
551
552 LocalReader {
553 slot,
554 shared,
555 pin_count: Cell::new(0),
556 }
557 }
558
559 /// Pin this thread to the current version.
560 ///
561 /// Returns a `PinGuard` that keeps the thread pinned for its lifetime.
562 /// The guard can be dereferenced to access the current value.
563 ///
564 /// **Reentrancy**: This method is reentrant. Multiple calls can be nested, and the thread
565 /// remains pinned until all returned guards are dropped. You can also clone a guard to create
566 /// additional references: `let guard2 = guard1.clone();`
567 ///
568 /// **Example**:
569 /// ```ignore
570 /// let local = cell.local_reader();
571 /// let guard1 = local.pin();
572 /// let value = *guard1; // Dereference to read
573 /// let guard2 = local.pin(); // Reentrant call
574 /// let guard3 = guard1.clone(); // Clone for nested scope
575 /// // Thread remains pinned until all three guards are dropped
576 /// ```
577 ///
578 /// While pinned, the thread is considered "active" at a particular version,
579 /// and the garbage collector will not reclaim data from that version.
580 ///
581 /// 将此线程钉住到当前版本。
582 ///
583 /// 返回一个 `PinGuard`,在其生命周期内保持线程被钉住。
584 /// 可以解引用该守卫来访问当前值。
585 ///
586 /// **可重入性**:此方法是可重入的。多个调用可以嵌套,线程在所有返回的守卫被 drop 之前保持被钉住。
587 /// 你也可以克隆一个守卫来创建额外的引用:`let guard2 = guard1.clone();`
588 ///
589 /// **示例**:
590 /// ```ignore
591 /// let local = cell.local_reader();
592 /// let guard1 = local.pin();
593 /// let value = *guard1; // 解引用来读取
594 /// let guard2 = local.pin(); // 可重入调用
595 /// let guard3 = guard1.clone(); // 克隆用于嵌套作用域
596 /// // 线程保持被钉住直到所有三个守卫被 drop
597 /// ```
598 ///
599 /// 当被钉住时,线程被认为在特定版本"活跃",垃圾回收器不会回收该版本的数据。
600 /// Check if this reader is currently pinned.
601 ///
602 /// 检查此读者当前是否被 pin。
603 #[inline]
604 pub fn is_pinned(&self) -> bool {
605 self.pin_count.get() > 0
606 }
607
608 /// Get the current global version.
609 ///
610 /// Note: This returns the global version, not the pinned version.
611 /// To get the pinned version, use `PinGuard::version()`.
612 ///
613 /// 获取当前全局版本。
614 /// 注意:这返回全局版本,而不是 pin 的版本。
615 /// 要获取 pin 的版本,请使用 `PinGuard::version()`。
616 #[inline]
617 pub fn version(&self) -> usize {
618 self.shared.global_version.load(Ordering::Acquire)
619 }
620
621 #[inline]
622 pub fn pin(&self) -> PinGuard<'_, T, RP> {
623 let pin_count = self.pin_count.get();
624
625 // Reentrant pin: the version is already protected by the outer pin.
626 // Just increment count and reuse the existing pinned pointer.
627 // 可重入 pin:版本已经被外层 pin 保护。
628 // 只需增加计数并复用现有的 pinned 指针。
629 if pin_count > 0 {
630 self.pin_count.set(pin_count + 1);
631
632 // Load the pointer that corresponds to our already-pinned version.
633 // Since we're reentrant, we should see the same or newer pointer.
634 // 加载与我们已 pin 版本对应的指针。
635 // 由于是可重入的,我们应该看到相同或更新的指针。
636 let ptr = self.shared.ptr.load(Ordering::Acquire);
637 let version = self.slot.active_version.load(Ordering::Acquire);
638
639 return PinGuard {
640 local: self,
641 ptr,
642 version,
643 };
644 }
645
646 // First pin: need to acquire a version and validate it.
647 // 首次 pin:需要获取版本并验证。
648 loop {
649 let current_version = self.shared.global_version.load(Ordering::Acquire);
650
651 self.slot
652 .active_version
653 .store(current_version, Ordering::Release);
654
655 // Light barrier coupled with Writer's Heavy barrier prevents Store-Load reordering.
656 light_barrier::<RP>();
657
658 // Check if our version is still valid (not yet reclaimed).
659 // 检查我们的版本是否仍然有效(尚未被回收)。
660 let min_active = self.shared.min_active_version.load(Ordering::Acquire);
661
662 if current_version >= min_active {
663 break;
664 }
665
666 // Version was reclaimed between our read and store.
667 // Retry with a fresh version.
668 // 版本在我们读取和存储之间被回收了。
669 // 用新版本重试。
670 core::hint::spin_loop();
671 }
672
673 self.pin_count.set(1);
674
675 // Capture the pointer and version at pin time for snapshot semantics.
676 // 在 pin 时捕获指针和版本以实现快照语义。
677 let ptr = self.shared.ptr.load(Ordering::Acquire);
678 let version = self.slot.active_version.load(Ordering::Acquire);
679
680 PinGuard {
681 local: self,
682 ptr,
683 version,
684 }
685 }
686
687 /// Create a new `SwmrReaderFactory` from this `LocalReader`.
688 ///
689 /// `SwmrReaderFactory` is `Sync` + `Clone` and acts as a factory for `LocalReader`s.
690 /// This is equivalent to calling `swmr_cell.reader_factory()`, but using the `LocalReader`'s reference to the shared state.
691 ///
692 /// 从此 `LocalReader` 创建一个新的 `SwmrReaderFactory`。
693 /// `SwmrReaderFactory` 是 `Sync` + `Clone` 的,充当 `LocalReader` 的工厂。
694 /// 这相当于调用 `swmr_cell.reader_factory()`,但使用 `LocalReader` 对共享状态的引用。
695 #[inline]
696 pub fn reader_factory(&self) -> SwmrReaderFactory<T, RP> {
697 SwmrReaderFactory {
698 shared: self.shared.clone(),
699 }
700 }
701
702 /// Convert this `LocalReader` into a `SwmrReaderFactory`.
703 ///
704 /// This consumes the `LocalReader` and returns a `SwmrReaderFactory`
705 /// that can be sent to another thread to create new `LocalReader`s.
706 ///
707 /// 将此 `LocalReader` 转换为 `SwmrReaderFactory`。
708 /// 这会消耗 `LocalReader` 并返回一个 `SwmrReaderFactory`,
709 /// 该 `SwmrReaderFactory` 可以发送到另一个线程以创建新 `LocalReader`。
710 #[inline]
711 pub fn into_swmr(self) -> SwmrReaderFactory<T, RP> {
712 SwmrReaderFactory {
713 shared: self.shared.clone(),
714 }
715 }
716}
717
718impl<T: 'static, const RP: bool> Clone for LocalReader<T, RP> {
719 #[inline]
720 fn clone(&self) -> Self {
721 Self::new(self.shared.clone())
722 }
723}
724
725/// A guard that keeps the current thread pinned to a version.
726///
727/// `PinGuard` is obtained by calling `LocalReader::pin()`.
728/// It implements `Deref<Target = T>` to allow reading the current value.
729/// It is `!Send` and `!Sync` because it references a `!Sync` `LocalReader`.
730/// Its lifetime is bound to the `LocalReader` it came from.
731///
732/// While a `PinGuard` is held, the thread is considered "active" at a particular version,
733/// and the garbage collector will not reclaim data from that version.
734///
735/// `PinGuard` supports internal cloning via reference counting (increments the pin count),
736/// allowing nested pinning. The thread remains pinned until all cloned guards are dropped.
737///
738/// **Safety**: The `PinGuard` is the mechanism that ensures safe concurrent access to
739/// shared values. Readers must always hold a valid `PinGuard` when accessing
740/// shared data.
741///
742/// 一个保持当前线程被钉住到一个版本的守卫。
743/// `PinGuard` 通过调用 `LocalReader::pin()` 获得。
744/// 它实现了 `Deref<Target = T>`,允许读取当前值。
745/// 它是 `!Send` 和 `!Sync` 的,因为它引用了一个 `!Sync` 的 `LocalReader`。
746/// 它的生命周期被绑定到它来自的 `LocalReader`。
747/// 当 `PinGuard` 被持有时,线程被认为在特定版本"活跃",
748/// 垃圾回收器不会回收该版本的数据。
749/// `PinGuard` 支持通过引用计数的内部克隆(增加 pin 计数),允许嵌套 pinning。
750/// 线程保持被钉住直到所有克隆的守卫被 drop。
751/// **安全性**:`PinGuard` 是确保对值安全并发访问的机制。
752/// 读者在访问共享数据时必须始终持有有效的 `PinGuard`。
753#[must_use]
754pub struct PinGuard<'a, T: 'static, const RP: bool = false> {
755 local: &'a LocalReader<T, RP>,
756 /// The pointer captured at pin time for snapshot semantics.
757 /// 在 pin 时捕获的指针,用于快照语义。
758 ptr: *const T,
759 /// The version at pin time.
760 /// pin 时的版本。
761 version: usize,
762}
763
764impl<T: 'static> PinGuard<'_, T> {
765 /// Get the version that this guard is pinned to.
766 ///
767 /// 获取此守卫被 pin 到的版本。
768 #[inline]
769 pub fn version(&self) -> usize {
770 self.version
771 }
772}
773
774impl<'a, T> Deref for PinGuard<'a, T> {
775 type Target = T;
776
777 /// Dereference to access the pinned value.
778 ///
779 /// Returns a reference to the value that was current when this guard was created.
780 /// This provides snapshot semantics - the value won't change during the guard's lifetime.
781 ///
782 /// 解引用以访问被 pin 的值。
783 ///
784 /// 返回对创建此守卫时当前值的引用。
785 /// 这提供了快照语义 - 在守卫的生命周期内值不会改变。
786 #[inline]
787 fn deref(&self) -> &T {
788 // Safety: pin() guarantees pinned_version >= min_active,
789 // and the pointer was captured at pin time.
790 // The value is valid as long as guard is held.
791 // 安全性:pin() 保证 pinned_version >= min_active,
792 // 并且指针在 pin 时被捕获。
793 // 只要 guard 被持有,值就是有效的。
794 unsafe { &*self.ptr }
795 }
796}
797
798impl<'a, T> Clone for PinGuard<'a, T> {
799 /// Clone this guard to create a nested pin.
800 ///
801 /// Cloning increments the pin count, and the thread remains pinned until all cloned guards
802 /// are dropped. This allows multiple scopes to hold pins simultaneously.
803 ///
804 /// 克隆此守卫以创建嵌套 pin。
805 ///
806 /// 克隆会增加 pin 计数,线程保持被钉住直到所有克隆的守卫被 drop。
807 /// 这允许多个作用域同时持有 pin。
808 #[inline]
809 fn clone(&self) -> Self {
810 let pin_count = self.local.pin_count.get();
811
812 assert!(
813 pin_count > 0,
814 "BUG: Cloning a PinGuard in an unpinned state (pin_count = 0). \
815 This indicates incorrect API usage or a library bug."
816 );
817
818 self.local.pin_count.set(pin_count + 1);
819
820 PinGuard {
821 local: self.local,
822 ptr: self.ptr,
823 version: self.version,
824 }
825 }
826}
827
828impl<'a, T, const RP: bool> Drop for PinGuard<'a, T, RP> {
829 #[inline]
830 fn drop(&mut self) {
831 let pin_count = self.local.pin_count.get();
832
833 assert!(
834 pin_count > 0,
835 "BUG: Dropping a PinGuard in an unpinned state (pin_count = 0). \
836 This indicates incorrect API usage or a library bug."
837 );
838
839 if pin_count == 1 {
840 self.local
841 .slot
842 .active_version
843 .store(INACTIVE_VERSION, Ordering::Release);
844 }
845
846 self.local.pin_count.set(pin_count - 1);
847 }
848}
849
850impl<T: 'static> AsRef<T> for PinGuard<'_, T> {
851 #[inline]
852 fn as_ref(&self) -> &T {
853 self.deref()
854 }
855}
856
857// ============================================================================
858// Standard Trait Implementations
859// 标准 trait 实现
860// ============================================================================
861
862impl<T: Default + 'static> Default for SwmrCell<T> {
863 /// Create a new SWMR cell with the default value.
864 ///
865 /// 使用默认值创建一个新的 SWMR 单元。
866 #[inline]
867 fn default() -> Self {
868 Self::new(T::default())
869 }
870}
871
872impl<T: 'static> From<T> for SwmrCell<T> {
873 /// Create a new SWMR cell from a value.
874 ///
875 /// 从一个值创建一个新的 SWMR 单元。
876 #[inline]
877 fn from(value: T) -> Self {
878 Self::new(value)
879 }
880}
881
882impl<T: fmt::Debug + 'static> fmt::Debug for SwmrCell<T> {
883 #[inline]
884 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885 f.debug_struct("SwmrCell")
886 .field("value", self.get())
887 .field("version", &self.version())
888 .field("garbage_count", &self.garbage_count())
889 .finish()
890 }
891}
892
893impl<T: 'static> fmt::Debug for LocalReader<T> {
894 #[inline]
895 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896 f.debug_struct("LocalReader")
897 .field("is_pinned", &self.is_pinned())
898 .field("version", &self.version())
899 .finish()
900 }
901}
902
903impl<T: fmt::Debug + 'static> fmt::Debug for PinGuard<'_, T> {
904 #[inline]
905 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
906 f.debug_struct("PinGuard")
907 .field("value", &self.deref())
908 .field("version", &self.version)
909 .finish()
910 }
911}