swmr_cell/
lib.rs

1//! # SWMR Version-Based Single Object
2//!
3//! This crate provides a single-writer, multi-reader (SWMR) cell that supports
4//! concurrent wait-free reads and lock-free writes using version-based garbage collection.
5//!
6//! ## Core Concepts
7//!
8//! - **Single Object**: The `swmr_cell` library manages a single versioned object per `SwmrCell`.
9//! - **Version**: The version counter represents the state of the object. Each write increments the version.
10//! - **Pinning**: Readers pin the current version when they start reading, preventing the writer from reclaiming that version (and any older versions still visible to other readers) until they are done.
11//!
12//! ## Typical Usage
13//!
14//! ```rust
15//! use swmr_cell::SwmrCell;
16//!
17//! // 1. Create a new SWMR cell with an initial value
18//! let mut cell = SwmrCell::new(42i32);
19//!
20//! // 2. Create a local reader for this thread (or pass to another thread)
21//! let local = cell.local();
22//!
23//! // 3. Pin and read the value by dereferencing the guard
24//! let guard = local.pin();
25//! assert_eq!(*guard, 42);
26//! drop(guard);
27//!
28//! // 4. Writer updates the value
29//! cell.store(100i32);
30//!
31//! // 5. Read the new value
32//! let guard = local.pin();
33//! assert_eq!(*guard, 100);
34//! drop(guard);
35//!
36//! // 6. Manually collect garbage (optional, happens automatically too)
37//! cell.collect();
38//! ```
39mod sync;
40
41#[cfg(test)]
42mod tests;
43
44use crate::sync::*;
45use std::{collections::VecDeque, marker::PhantomData, ops::Deref, vec::Vec};
46
47/// Default threshold for automatic garbage reclamation (count of retired nodes).
48/// 自动垃圾回收的默认阈值(已退休节点的数量)。
49pub(crate) const AUTO_RECLAIM_THRESHOLD: usize = 16;
50
51/// Represents a reader that is not currently pinned to any version.
52/// 表示当前未被钉住到任何版本的读者。
53pub(crate) const INACTIVE_VERSION: usize = usize::MAX;
54
55
56/// A single-writer, multi-reader cell with version-based garbage collection.
57///
58/// `SwmrCell` provides safe concurrent access where one writer can update the value
59/// and multiple readers can read it concurrently. Readers access the value by
60/// creating a `LocalReader` and pinning it.
61///
62/// 单写多读单元,带有基于版本的垃圾回收。
63///
64/// `SwmrCell` 提供安全的并发访问,其中一个写入者可以更新值,
65/// 多个读者可以并发读取它。读者通过创建 `LocalReader` 并 pin 来访问值。
66pub struct SwmrCell<T: 'static> {
67    shared: Arc<SharedState>,
68    ptr: Arc<AtomicPtr<T>>,
69    garbage: GarbageSet<T>,
70    auto_reclaim_threshold: Option<usize>,
71}
72
73impl<T: 'static> SwmrCell<T> {
74    /// Create a new SWMR cell with default settings and the given initial value.
75    ///
76    /// 使用默认设置和给定的初始值创建一个新的 SWMR 单元。
77    #[inline]
78    pub fn new(data: T) -> Self {
79        Self::builder().build(data)
80    }
81
82    /// Returns a builder for configuring the SWMR cell.
83    ///
84    /// 返回用于配置 SWMR 单元的构建器。
85    #[inline]
86    pub fn builder() -> SwmrCellBuilder<T> {
87        SwmrCellBuilder {
88            auto_reclaim_threshold: Some(AUTO_RECLAIM_THRESHOLD),
89            marker: PhantomData::default()
90        }
91    }
92
93    /// Create a new `LocalReader` for reading.
94    ///
95    /// Each thread should create its own `LocalReader` and reuse it.
96    /// `LocalReader` is `!Sync` and should not be shared between threads.
97    ///
98    /// 创建一个新的 `LocalReader` 用于读取。
99    /// 每个线程应该创建自己的 `LocalReader` 并重复使用。
100    /// `LocalReader` 是 `!Sync` 的,不应在线程之间共享。
101    #[inline]
102    pub fn local(&self) -> LocalReader<T> {
103        LocalReader::new(self.shared.clone(), self.ptr.clone())
104    }
105
106    /// Store a new value, making it visible to readers.
107    /// The old value is retired and will be garbage collected.
108    ///
109    /// This operation increments the global version.
110    ///
111    /// 存储新值,使其对读者可见。
112    /// 旧值已退休,将被垃圾回收。
113    /// 此操作会增加全局版本。
114    pub fn store(&mut self, data: T) {
115        let new_ptr = Box::into_raw(Box::new(data));
116        let old_ptr = self.ptr.swap(new_ptr, Ordering::Release);
117
118        // Increment global version.
119        // The old value belongs to the previous version (the one before this increment).
120        // 增加全局版本。
121        // 旧值属于前一个版本(此次增加之前的那个)。
122        let old_version = self.shared.global_version.fetch_add(1, Ordering::AcqRel);
123
124        if !old_ptr.is_null() {
125            // Safe because we just swapped it out and we own the writer
126            unsafe {
127                self.garbage.add(Box::from_raw(old_ptr), old_version);
128            }
129        }
130
131        // Auto-reclaim
132        if let Some(threshold) = self.auto_reclaim_threshold {
133            if self.garbage.len() > threshold {
134                self.collect();
135            }
136        }
137    }
138
139    /// Manually trigger garbage collection.
140    /// 手动触发垃圾回收。
141    pub fn collect(&mut self) {
142        // In this design, we don't necessarily advance the version just for collection.
143        // But we need to find min_active_version.
144
145        let current_version = self.shared.global_version.load(Ordering::Acquire);
146
147        let safety_limit = current_version.saturating_sub(2);
148
149        let mut min_active = current_version;
150
151        let mut shared_readers = self.shared.readers.lock();
152
153        for arc_slot in shared_readers.iter() {
154            let version = arc_slot.active_version.load(Ordering::Acquire);
155            if version != INACTIVE_VERSION {
156                min_active = min_active.min(version);
157            }
158        }
159
160        // Clean up dead reader slots (strong_count == 1 means only SharedState holds it)
161        // 清理死读者槽(strong_count == 1 表示只有 SharedState 持有它)
162        shared_readers.retain(|arc_slot| Arc::strong_count(arc_slot) > 1);
163        
164        drop(shared_readers);
165
166        let reclaim_threshold = min_active.min(safety_limit);
167
168        self.shared.min_active_version.store(reclaim_threshold, Ordering::Release);
169
170        self.garbage.collect(reclaim_threshold, current_version);
171    }
172}
173
174
175/// A builder for configuring and creating a SWMR cell.
176///
177/// 用于配置和创建 SWMR 单元的构建器。
178pub struct SwmrCellBuilder<T> {
179    auto_reclaim_threshold: Option<usize>,
180    marker: PhantomData<T>
181}
182
183impl<T: 'static> SwmrCellBuilder<T> {
184    /// Sets the threshold for automatic garbage reclamation.
185    ///
186    /// When the number of retired objects exceeds this threshold,
187    /// garbage collection is triggered automatically during `store`.
188    ///
189    /// Set to `None` to disable automatic reclamation.
190    /// Default is `Some(64)`.
191    ///
192    /// 设置自动垃圾回收的阈值。
193    /// 当已退休对象的数量超过此阈值时,将在 `store` 期间自动触发垃圾回收。
194    /// 设置为 `None` 以禁用自动回收。
195    /// 默认为 `Some(64)`。
196    #[inline]
197    pub fn auto_reclaim_threshold(mut self, threshold: Option<usize>) -> Self {
198        self.auto_reclaim_threshold = threshold;
199        self
200    }
201
202    /// Creates a new SWMR cell with the configured settings and initial value.
203    ///
204    /// 使用配置的设置和初始值创建一个新的 SWMR 单元。
205    pub fn build(self, data: T) -> SwmrCell<T> {
206        let shared = Arc::new(SharedState {
207            global_version: AtomicUsize::new(0),
208            min_active_version: AtomicUsize::new(0),
209            readers: Mutex::new(Vec::new()),
210        });
211
212        let ptr = Arc::new(AtomicPtr::new(Box::into_raw(Box::new(data))));
213
214        SwmrCell {
215            shared,
216            ptr,
217            garbage: GarbageSet::new(),
218            auto_reclaim_threshold: self.auto_reclaim_threshold,
219        }
220    }
221}
222
223
224
225/// Manages retired objects and their reclamation.
226///
227/// This struct encapsulates the logic for:
228/// - Storing retired objects in version-ordered queue.
229/// - Reclaiming objects when they are safe to delete.
230///
231/// 管理已退休对象及其回收。
232///
233/// 此结构体封装了以下逻辑:
234/// - 将已退休对象存储在按版本排序的队列中。
235/// - 当对象可以安全删除时进行回收。
236struct GarbageSet<T> {
237    /// Queue of garbage items, ordered by version.
238    /// Each element is (version, node).
239    queue: VecDeque<(usize, Box<T>)>,
240}
241
242impl<T> GarbageSet<T> {
243    /// Create a new empty garbage set.
244    /// 创建一个新的空垃圾集合。
245    fn new() -> Self {
246        Self {
247            queue: VecDeque::new(),
248        }
249    }
250
251    /// Get the total number of retired objects.
252    /// 获取已退休对象的总数。
253    #[inline]
254    fn len(&self) -> usize {
255        self.queue.len()
256    }
257
258    /// Add a retired node to the set for the current version.
259    ///
260    /// 将已退休节点添加到当前版本的集合中。
261    #[inline]
262    fn add(&mut self, node: Box<T>, current_version: usize) {
263        self.queue.push_back((current_version, node));
264    }
265
266    /// Reclaim garbage that is safe to delete.
267    ///
268    /// Garbage from versions older than `min_active_version` is dropped.
269    ///
270    /// 回收可以安全删除的垃圾。
271    ///
272    /// 来自比 `min_active_version` 更旧的版本的垃圾将被 drop。
273    #[inline]
274    fn collect(&mut self, min_active_version: usize, _current_version: usize) {
275        // We reclaim everything that is strictly older than min_active_version.
276        // If min_active_version == current_version, then everything (all < current_version) is reclaimed.
277        while let Some((version, _)) = self.queue.front() {
278            if *version >= min_active_version {
279                break;
280            }
281            self.queue.pop_front(); // Box<T> is dropped here
282        }
283    }
284}
285
286
287
288/// A slot allocated for a reader thread to record its active version.
289///
290/// Cache-aligned to prevent false sharing between readers.
291///
292/// 为读者线程分配的槽,用于记录其活跃版本。
293/// 缓存对齐以防止读者之间的伪共享。
294#[derive(Debug)]
295#[repr(align(64))]
296pub(crate) struct ReaderSlot {
297    /// The version currently being accessed by the reader, or INACTIVE_VERSION.
298    /// 读者当前访问的版本,或 INACTIVE_VERSION。
299    pub(crate) active_version: AtomicUsize,
300}
301
302/// Global shared state for the version GC domain.
303///
304/// Contains the global version, the minimum active version, and the list of reader slots.
305///
306/// version GC 域的全局共享状态。
307/// 包含全局版本、最小活跃版本和读者槽列表。
308#[derive(Debug)]
309#[repr(align(64))]
310pub(crate) struct SharedState {
311    /// The global monotonic version counter.
312    /// 全局单调版本计数器。
313    pub(crate) global_version: AtomicUsize,
314    /// The minimum version among all active readers (cached for performance).
315    /// 所有活跃读者中的最小版本(为性能而缓存)。
316    pub(crate) min_active_version: AtomicUsize,
317    /// List of all registered reader slots. Protected by a Mutex.
318    /// 所有注册读者槽的列表。由 Mutex 保护。
319    pub(crate) readers: Mutex<Vec<Arc<ReaderSlot>>>,
320}
321
322/// A reader thread's local version state.
323///
324/// Each reader thread should create exactly one `LocalReader` via `SwmrCell::local()`.
325/// It is `!Sync` (due to `Cell`) and must be stored per-thread.
326///
327/// The `LocalReader` is used to:
328/// - Pin the thread to the current version via `pin()`.
329/// - Obtain a `PinGuard` that protects access to values and can be dereferenced.
330///
331/// **Thread Safety**: `LocalReader` is not `Sync` and must be used by only one thread.
332///
333/// 读者线程的本地版本状态。
334/// 每个读者线程应该通过 `SwmrCell::local()` 创建恰好一个 `LocalReader`。
335/// 它是 `!Sync` 的(因为 `Cell`),必须在每个线程中存储。
336/// `LocalReader` 用于:
337/// - 通过 `pin()` 将线程钉住到当前版本。
338/// - 获取保护对值访问的 `PinGuard`,可以解引用来读取值。
339/// **线程安全性**:`LocalReader` 不是 `Sync` 的,必须仅由一个线程使用。
340pub struct LocalReader<T: 'static> {
341    slot: Arc<ReaderSlot>,
342    shared: Arc<SharedState>,
343    ptr: Arc<AtomicPtr<T>>,
344    pin_count: Cell<usize>,
345}
346
347impl<T: 'static> LocalReader<T> {
348    fn new(shared: Arc<SharedState>, ptr: Arc<AtomicPtr<T>>) -> Self {
349        let slot = Arc::new(ReaderSlot {
350            active_version: AtomicUsize::new(INACTIVE_VERSION),
351        });
352
353        // Register the reader immediately in the shared readers list
354        shared.readers.lock().push(Arc::clone(&slot));
355
356        LocalReader {
357            slot,
358            shared,
359            ptr,
360            pin_count: Cell::new(0),
361        }
362    }
363    
364    /// Pin this thread to the current version.
365    ///
366    /// Returns a `PinGuard` that keeps the thread pinned for its lifetime.
367    /// The guard can be dereferenced to access the current value.
368    ///
369    /// **Reentrancy**: This method is reentrant. Multiple calls can be nested, and the thread
370    /// remains pinned until all returned guards are dropped. You can also clone a guard to create
371    /// additional references: `let guard2 = guard1.clone();`
372    ///
373    /// **Example**:
374    /// ```ignore
375    /// let local = cell.local();
376    /// let guard1 = local.pin();
377    /// let value = *guard1;  // Dereference to read
378    /// let guard2 = local.pin();  // Reentrant call
379    /// let guard3 = guard1.clone();     // Clone for nested scope
380    /// // Thread remains pinned until all three guards are dropped
381    /// ```
382    ///
383    /// While pinned, the thread is considered "active" at a particular version,
384    /// and the garbage collector will not reclaim data from that version.
385    ///
386    /// 将此线程钉住到当前版本。
387    ///
388    /// 返回一个 `PinGuard`,在其生命周期内保持线程被钉住。
389    /// 可以解引用该守卫来访问当前值。
390    ///
391    /// **可重入性**:此方法是可重入的。多个调用可以嵌套,线程在所有返回的守卫被 drop 之前保持被钉住。
392    /// 你也可以克隆一个守卫来创建额外的引用:`let guard2 = guard1.clone();`
393    ///
394    /// **示例**:
395    /// ```ignore
396    /// let local = cell.local();
397    /// let guard1 = local.pin();
398    /// let value = *guard1;  // 解引用来读取
399    /// let guard2 = local.pin();  // 可重入调用
400    /// let guard3 = guard1.clone();     // 克隆用于嵌套作用域
401    /// // 线程保持被钉住直到所有三个守卫被 drop
402    /// ```
403    ///
404    /// 当被钉住时,线程被认为在特定版本"活跃",垃圾回收器不会回收该版本的数据。
405    #[inline]
406    pub fn pin(&self) -> PinGuard<'_, T> {
407        let pin_count = self.pin_count.get();
408
409        // Reentrant pin: the version is already protected by the outer pin.
410        // Just increment count and reuse the existing pinned pointer.
411        // 可重入 pin:版本已经被外层 pin 保护。
412        // 只需增加计数并复用现有的 pinned 指针。
413        if pin_count > 0 {
414            self.pin_count.set(pin_count + 1);
415            
416            // Load the pointer that corresponds to our already-pinned version.
417            // Since we're reentrant, we should see the same or newer pointer.
418            // 加载与我们已 pin 版本对应的指针。
419            // 由于是可重入的,我们应该看到相同或更新的指针。
420            let ptr = self.ptr.load(Ordering::Acquire);
421            
422            return PinGuard { local: self, ptr };
423        }
424
425        // First pin: need to acquire a version and validate it.
426        // 首次 pin:需要获取版本并验证。
427        loop {
428            let current_version = self.shared.global_version.load(Ordering::Acquire);
429            
430            self.slot
431                .active_version
432                .store(current_version, Ordering::Release);
433
434            // Check if our version is still valid (not yet reclaimed).
435            // 检查我们的版本是否仍然有效(尚未被回收)。
436            let min_active = self.shared.min_active_version.load(Ordering::Acquire);
437            
438            if current_version >= min_active {
439                break;
440            }
441            
442            // Version was reclaimed between our read and store.
443            // Retry with a fresh version.
444            // 版本在我们读取和存储之间被回收了。
445            // 用新版本重试。
446            std::hint::spin_loop();
447        }
448
449        self.pin_count.set(1);
450
451        // Capture the pointer at pin time for snapshot semantics.
452        // 在 pin 时捕获指针以实现快照语义。
453        let ptr = self.ptr.load(Ordering::Acquire);
454
455        PinGuard { local: self, ptr }
456    }
457}
458
459impl<T: 'static> Clone for LocalReader<T> {
460    #[inline]
461    fn clone(&self) -> Self {
462        Self::new(self.shared.clone(), self.ptr.clone())
463    }
464}
465
466/// A guard that keeps the current thread pinned to a version.
467///
468/// `PinGuard` is obtained by calling `LocalReader::pin()`.
469/// It implements `Deref<Target = T>` to allow reading the current value.
470/// It is `!Send` and `!Sync` because it references a `!Sync` `LocalReader`.
471/// Its lifetime is bound to the `LocalReader` it came from.
472///
473/// While a `PinGuard` is held, the thread is considered "active" at a particular version,
474/// and the garbage collector will not reclaim data from that version.
475///
476/// `PinGuard` supports internal cloning via reference counting (increments the pin count),
477/// allowing nested pinning. The thread remains pinned until all cloned guards are dropped.
478///
479/// **Safety**: The `PinGuard` is the mechanism that ensures safe concurrent access to
480/// shared values. Readers must always hold a valid `PinGuard` when accessing
481/// shared data.
482///
483/// 一个保持当前线程被钉住到一个版本的守卫。
484/// `PinGuard` 通过调用 `LocalReader::pin()` 获得。
485/// 它实现了 `Deref<Target = T>`,允许读取当前值。
486/// 它是 `!Send` 和 `!Sync` 的,因为它引用了一个 `!Sync` 的 `LocalReader`。
487/// 它的生命周期被绑定到它来自的 `LocalReader`。
488/// 当 `PinGuard` 被持有时,线程被认为在特定版本"活跃",
489/// 垃圾回收器不会回收该版本的数据。
490/// `PinGuard` 支持通过引用计数的内部克隆(增加 pin 计数),允许嵌套 pinning。
491/// 线程保持被钉住直到所有克隆的守卫被 drop。
492/// **安全性**:`PinGuard` 是确保对值安全并发访问的机制。
493/// 读者在访问共享数据时必须始终持有有效的 `PinGuard`。
494#[must_use]
495pub struct PinGuard<'a, T: 'static> {
496    local: &'a LocalReader<T>,
497    /// The pointer captured at pin time for snapshot semantics.
498    /// 在 pin 时捕获的指针,用于快照语义。
499    ptr: *const T,
500}
501
502impl<'a, T> Deref for PinGuard<'a, T> {
503    type Target = T;
504
505    /// Dereference to access the pinned value.
506    ///
507    /// Returns a reference to the value that was current when this guard was created.
508    /// This provides snapshot semantics - the value won't change during the guard's lifetime.
509    ///
510    /// 解引用以访问被 pin 的值。
511    ///
512    /// 返回对创建此守卫时当前值的引用。
513    /// 这提供了快照语义 - 在守卫的生命周期内值不会改变。
514    #[inline]
515    fn deref(&self) -> &T {
516        // Safety: pin() guarantees pinned_version >= min_active,
517        // and the pointer was captured at pin time.
518        // The value is valid as long as guard is held.
519        // 安全性:pin() 保证 pinned_version >= min_active,
520        // 并且指针在 pin 时被捕获。
521        // 只要 guard 被持有,值就是有效的。
522        unsafe { &*self.ptr }
523    }
524}
525
526impl<'a, T> Clone for PinGuard<'a, T> {
527    /// Clone this guard to create a nested pin.
528    ///
529    /// Cloning increments the pin count, and the thread remains pinned until all cloned guards
530    /// are dropped. This allows multiple scopes to hold pins simultaneously.
531    ///
532    /// 克隆此守卫以创建嵌套 pin。
533    ///
534    /// 克隆会增加 pin 计数,线程保持被钉住直到所有克隆的守卫被 drop。
535    /// 这允许多个作用域同时持有 pin。
536    #[inline]
537    fn clone(&self) -> Self {
538        let pin_count = self.local.pin_count.get();
539
540        assert!(
541            pin_count > 0,
542            "BUG: Cloning a PinGuard in an unpinned state (pin_count = 0). \
543             This indicates incorrect API usage or a library bug."
544        );
545
546        self.local.pin_count.set(pin_count + 1);
547
548        PinGuard {
549            local: self.local,
550            ptr: self.ptr,
551        }
552    }
553}
554
555impl<'a, T> Drop for PinGuard<'a, T> {
556    #[inline]
557    fn drop(&mut self) {
558        let pin_count = self.local.pin_count.get();
559
560        assert!(
561            pin_count > 0,
562            "BUG: Dropping a PinGuard in an unpinned state (pin_count = 0). \
563             This indicates incorrect API usage or a library bug."
564        );
565
566        if pin_count == 1 {
567            self.local
568                .slot
569                .active_version
570                .store(INACTIVE_VERSION, Ordering::Release);
571        }
572
573        self.local.pin_count.set(pin_count - 1);
574    }
575}