swmr_cell/lib.rs
1//! # SWMR Version-Based Single Object
2//!
3//! This crate provides a single-writer, multi-reader (SWMR) cell that supports
4//! concurrent wait-free reads and lock-free writes using version-based garbage collection.
5//!
6//! ## Core Concepts
7//!
8//! - **Single Object**: The `swmr_cell` library manages a single versioned object per `SwmrCell`.
9//! - **Version**: The version counter represents the state of the object. Each write increments the version.
10//! - **Pinning**: Readers pin the current version when they start reading, preventing the writer from reclaiming that version (and any older versions still visible to other readers) until they are done.
11//!
12//! ## Typical Usage
13//!
14//! ```rust
15//! use swmr_cell::SwmrCell;
16//!
17//! // 1. Create a new SWMR cell with an initial value
18//! let mut cell = SwmrCell::new(42i32);
19//!
20//! // 2. Create a local reader for this thread (or pass to another thread)
21//! let local = cell.local();
22//!
23//! // 3. Pin and read the value by dereferencing the guard
24//! let guard = local.pin();
25//! assert_eq!(*guard, 42);
26//! drop(guard);
27//!
28//! // 4. Writer updates the value
29//! cell.store(100i32);
30//!
31//! // 5. Read the new value
32//! let guard = local.pin();
33//! assert_eq!(*guard, 100);
34//! drop(guard);
35//!
36//! // 6. Manually collect garbage (optional, happens automatically too)
37//! cell.collect();
38//! ```
39mod sync;
40
41#[cfg(test)]
42mod tests;
43
44use crate::sync::*;
45use std::{collections::VecDeque, marker::PhantomData, ops::Deref, vec::Vec};
46
47/// Default threshold for automatic garbage reclamation (count of retired nodes).
48/// 自动垃圾回收的默认阈值(已退休节点的数量)。
49pub(crate) const AUTO_RECLAIM_THRESHOLD: usize = 16;
50
51/// Represents a reader that is not currently pinned to any version.
52/// 表示当前未被钉住到任何版本的读者。
53pub(crate) const INACTIVE_VERSION: usize = usize::MAX;
54
55
56/// A single-writer, multi-reader cell with version-based garbage collection.
57///
58/// `SwmrCell` provides safe concurrent access where one writer can update the value
59/// and multiple readers can read it concurrently. Readers access the value by
60/// creating a `LocalReader` and pinning it.
61///
62/// 单写多读单元,带有基于版本的垃圾回收。
63///
64/// `SwmrCell` 提供安全的并发访问,其中一个写入者可以更新值,
65/// 多个读者可以并发读取它。读者通过创建 `LocalReader` 并 pin 来访问值。
66pub struct SwmrCell<T: 'static> {
67 shared: Arc<SharedState>,
68 ptr: Arc<AtomicPtr<T>>,
69 garbage: GarbageSet<T>,
70 auto_reclaim_threshold: Option<usize>,
71}
72
73impl<T: 'static> SwmrCell<T> {
74 /// Create a new SWMR cell with default settings and the given initial value.
75 ///
76 /// 使用默认设置和给定的初始值创建一个新的 SWMR 单元。
77 #[inline]
78 pub fn new(data: T) -> Self {
79 Self::builder().build(data)
80 }
81
82 /// Returns a builder for configuring the SWMR cell.
83 ///
84 /// 返回用于配置 SWMR 单元的构建器。
85 #[inline]
86 pub fn builder() -> SwmrCellBuilder<T> {
87 SwmrCellBuilder {
88 auto_reclaim_threshold: Some(AUTO_RECLAIM_THRESHOLD),
89 marker: PhantomData::default()
90 }
91 }
92
93 /// Create a new `LocalReader` for reading.
94 ///
95 /// Each thread should create its own `LocalReader` and reuse it.
96 /// `LocalReader` is `!Sync` and should not be shared between threads.
97 ///
98 /// 创建一个新的 `LocalReader` 用于读取。
99 /// 每个线程应该创建自己的 `LocalReader` 并重复使用。
100 /// `LocalReader` 是 `!Sync` 的,不应在线程之间共享。
101 #[inline]
102 pub fn local(&self) -> LocalReader<T> {
103 LocalReader::new(self.shared.clone(), self.ptr.clone())
104 }
105
106 /// Store a new value, making it visible to readers.
107 /// The old value is retired and will be garbage collected.
108 ///
109 /// This operation increments the global version.
110 ///
111 /// 存储新值,使其对读者可见。
112 /// 旧值已退休,将被垃圾回收。
113 /// 此操作会增加全局版本。
114 pub fn store(&mut self, data: T) {
115 let new_ptr = Box::into_raw(Box::new(data));
116 let old_ptr = self.ptr.swap(new_ptr, Ordering::Release);
117
118 // Increment global version.
119 // The old value belongs to the previous version (the one before this increment).
120 // 增加全局版本。
121 // 旧值属于前一个版本(此次增加之前的那个)。
122 let old_version = self.shared.global_version.fetch_add(1, Ordering::AcqRel);
123
124 if !old_ptr.is_null() {
125 // Safe because we just swapped it out and we own the writer
126 unsafe {
127 self.garbage.add(Box::from_raw(old_ptr), old_version);
128 }
129 }
130
131 // Auto-reclaim
132 if let Some(threshold) = self.auto_reclaim_threshold {
133 if self.garbage.len() > threshold {
134 self.collect();
135 }
136 }
137 }
138
139 /// Manually trigger garbage collection.
140 /// 手动触发垃圾回收。
141 pub fn collect(&mut self) {
142 // In this design, we don't necessarily advance the version just for collection.
143 // But we need to find min_active_version.
144
145 let current_version = self.shared.global_version.load(Ordering::Acquire);
146
147 let safety_limit = current_version.saturating_sub(2);
148
149 let mut min_active = current_version;
150
151 let mut shared_readers = self.shared.readers.lock();
152
153 for arc_slot in shared_readers.iter() {
154 let version = arc_slot.active_version.load(Ordering::Acquire);
155 if version != INACTIVE_VERSION {
156 min_active = min_active.min(version);
157 }
158 }
159
160 // Clean up dead reader slots (strong_count == 1 means only SharedState holds it)
161 // 清理死读者槽(strong_count == 1 表示只有 SharedState 持有它)
162 shared_readers.retain(|arc_slot| Arc::strong_count(arc_slot) > 1);
163
164 drop(shared_readers);
165
166 let reclaim_threshold = min_active.min(safety_limit);
167
168 self.shared.min_active_version.store(reclaim_threshold, Ordering::Release);
169
170 self.garbage.collect(reclaim_threshold, current_version);
171 }
172}
173
174
175/// A builder for configuring and creating a SWMR cell.
176///
177/// 用于配置和创建 SWMR 单元的构建器。
178pub struct SwmrCellBuilder<T> {
179 auto_reclaim_threshold: Option<usize>,
180 marker: PhantomData<T>
181}
182
183impl<T: 'static> SwmrCellBuilder<T> {
184 /// Sets the threshold for automatic garbage reclamation.
185 ///
186 /// When the number of retired objects exceeds this threshold,
187 /// garbage collection is triggered automatically during `store`.
188 ///
189 /// Set to `None` to disable automatic reclamation.
190 /// Default is `Some(64)`.
191 ///
192 /// 设置自动垃圾回收的阈值。
193 /// 当已退休对象的数量超过此阈值时,将在 `store` 期间自动触发垃圾回收。
194 /// 设置为 `None` 以禁用自动回收。
195 /// 默认为 `Some(64)`。
196 #[inline]
197 pub fn auto_reclaim_threshold(mut self, threshold: Option<usize>) -> Self {
198 self.auto_reclaim_threshold = threshold;
199 self
200 }
201
202 /// Creates a new SWMR cell with the configured settings and initial value.
203 ///
204 /// 使用配置的设置和初始值创建一个新的 SWMR 单元。
205 pub fn build(self, data: T) -> SwmrCell<T> {
206 let shared = Arc::new(SharedState {
207 global_version: AtomicUsize::new(0),
208 min_active_version: AtomicUsize::new(0),
209 readers: Mutex::new(Vec::new()),
210 });
211
212 let ptr = Arc::new(AtomicPtr::new(Box::into_raw(Box::new(data))));
213
214 SwmrCell {
215 shared,
216 ptr,
217 garbage: GarbageSet::new(),
218 auto_reclaim_threshold: self.auto_reclaim_threshold,
219 }
220 }
221}
222
223
224
225/// Manages retired objects and their reclamation.
226///
227/// This struct encapsulates the logic for:
228/// - Storing retired objects in version-ordered queue.
229/// - Reclaiming objects when they are safe to delete.
230///
231/// 管理已退休对象及其回收。
232///
233/// 此结构体封装了以下逻辑:
234/// - 将已退休对象存储在按版本排序的队列中。
235/// - 当对象可以安全删除时进行回收。
236struct GarbageSet<T> {
237 /// Queue of garbage items, ordered by version.
238 /// Each element is (version, node).
239 queue: VecDeque<(usize, Box<T>)>,
240}
241
242impl<T> GarbageSet<T> {
243 /// Create a new empty garbage set.
244 /// 创建一个新的空垃圾集合。
245 fn new() -> Self {
246 Self {
247 queue: VecDeque::new(),
248 }
249 }
250
251 /// Get the total number of retired objects.
252 /// 获取已退休对象的总数。
253 #[inline]
254 fn len(&self) -> usize {
255 self.queue.len()
256 }
257
258 /// Add a retired node to the set for the current version.
259 ///
260 /// 将已退休节点添加到当前版本的集合中。
261 #[inline]
262 fn add(&mut self, node: Box<T>, current_version: usize) {
263 self.queue.push_back((current_version, node));
264 }
265
266 /// Reclaim garbage that is safe to delete.
267 ///
268 /// Garbage from versions older than `min_active_version` is dropped.
269 ///
270 /// 回收可以安全删除的垃圾。
271 ///
272 /// 来自比 `min_active_version` 更旧的版本的垃圾将被 drop。
273 #[inline]
274 fn collect(&mut self, min_active_version: usize, _current_version: usize) {
275 // We reclaim everything that is strictly older than min_active_version.
276 // If min_active_version == current_version, then everything (all < current_version) is reclaimed.
277 while let Some((version, _)) = self.queue.front() {
278 if *version >= min_active_version {
279 break;
280 }
281 self.queue.pop_front(); // Box<T> is dropped here
282 }
283 }
284}
285
286
287
288/// A slot allocated for a reader thread to record its active version.
289///
290/// Cache-aligned to prevent false sharing between readers.
291///
292/// 为读者线程分配的槽,用于记录其活跃版本。
293/// 缓存对齐以防止读者之间的伪共享。
294#[derive(Debug)]
295#[repr(align(64))]
296pub(crate) struct ReaderSlot {
297 /// The version currently being accessed by the reader, or INACTIVE_VERSION.
298 /// 读者当前访问的版本,或 INACTIVE_VERSION。
299 pub(crate) active_version: AtomicUsize,
300}
301
302/// Global shared state for the version GC domain.
303///
304/// Contains the global version, the minimum active version, and the list of reader slots.
305///
306/// version GC 域的全局共享状态。
307/// 包含全局版本、最小活跃版本和读者槽列表。
308#[derive(Debug)]
309#[repr(align(64))]
310pub(crate) struct SharedState {
311 /// The global monotonic version counter.
312 /// 全局单调版本计数器。
313 pub(crate) global_version: AtomicUsize,
314 /// The minimum version among all active readers (cached for performance).
315 /// 所有活跃读者中的最小版本(为性能而缓存)。
316 pub(crate) min_active_version: AtomicUsize,
317 /// List of all registered reader slots. Protected by a Mutex.
318 /// 所有注册读者槽的列表。由 Mutex 保护。
319 pub(crate) readers: Mutex<Vec<Arc<ReaderSlot>>>,
320}
321
322/// A reader thread's local version state.
323///
324/// Each reader thread should create exactly one `LocalReader` via `SwmrCell::local()`.
325/// It is `!Sync` (due to `Cell`) and must be stored per-thread.
326///
327/// The `LocalReader` is used to:
328/// - Pin the thread to the current version via `pin()`.
329/// - Obtain a `PinGuard` that protects access to values and can be dereferenced.
330///
331/// **Thread Safety**: `LocalReader` is not `Sync` and must be used by only one thread.
332///
333/// 读者线程的本地版本状态。
334/// 每个读者线程应该通过 `SwmrCell::local()` 创建恰好一个 `LocalReader`。
335/// 它是 `!Sync` 的(因为 `Cell`),必须在每个线程中存储。
336/// `LocalReader` 用于:
337/// - 通过 `pin()` 将线程钉住到当前版本。
338/// - 获取保护对值访问的 `PinGuard`,可以解引用来读取值。
339/// **线程安全性**:`LocalReader` 不是 `Sync` 的,必须仅由一个线程使用。
340pub struct LocalReader<T: 'static> {
341 slot: Arc<ReaderSlot>,
342 shared: Arc<SharedState>,
343 ptr: Arc<AtomicPtr<T>>,
344 pin_count: Cell<usize>,
345}
346
347impl<T: 'static> LocalReader<T> {
348 fn new(shared: Arc<SharedState>, ptr: Arc<AtomicPtr<T>>) -> Self {
349 let slot = Arc::new(ReaderSlot {
350 active_version: AtomicUsize::new(INACTIVE_VERSION),
351 });
352
353 // Register the reader immediately in the shared readers list
354 shared.readers.lock().push(Arc::clone(&slot));
355
356 LocalReader {
357 slot,
358 shared,
359 ptr,
360 pin_count: Cell::new(0),
361 }
362 }
363
364 /// Pin this thread to the current version.
365 ///
366 /// Returns a `PinGuard` that keeps the thread pinned for its lifetime.
367 /// The guard can be dereferenced to access the current value.
368 ///
369 /// **Reentrancy**: This method is reentrant. Multiple calls can be nested, and the thread
370 /// remains pinned until all returned guards are dropped. You can also clone a guard to create
371 /// additional references: `let guard2 = guard1.clone();`
372 ///
373 /// **Example**:
374 /// ```ignore
375 /// let local = cell.local();
376 /// let guard1 = local.pin();
377 /// let value = *guard1; // Dereference to read
378 /// let guard2 = local.pin(); // Reentrant call
379 /// let guard3 = guard1.clone(); // Clone for nested scope
380 /// // Thread remains pinned until all three guards are dropped
381 /// ```
382 ///
383 /// While pinned, the thread is considered "active" at a particular version,
384 /// and the garbage collector will not reclaim data from that version.
385 ///
386 /// 将此线程钉住到当前版本。
387 ///
388 /// 返回一个 `PinGuard`,在其生命周期内保持线程被钉住。
389 /// 可以解引用该守卫来访问当前值。
390 ///
391 /// **可重入性**:此方法是可重入的。多个调用可以嵌套,线程在所有返回的守卫被 drop 之前保持被钉住。
392 /// 你也可以克隆一个守卫来创建额外的引用:`let guard2 = guard1.clone();`
393 ///
394 /// **示例**:
395 /// ```ignore
396 /// let local = cell.local();
397 /// let guard1 = local.pin();
398 /// let value = *guard1; // 解引用来读取
399 /// let guard2 = local.pin(); // 可重入调用
400 /// let guard3 = guard1.clone(); // 克隆用于嵌套作用域
401 /// // 线程保持被钉住直到所有三个守卫被 drop
402 /// ```
403 ///
404 /// 当被钉住时,线程被认为在特定版本"活跃",垃圾回收器不会回收该版本的数据。
405 #[inline]
406 pub fn pin(&self) -> PinGuard<'_, T> {
407 let pin_count = self.pin_count.get();
408
409 // Reentrant pin: the version is already protected by the outer pin.
410 // Just increment count and reuse the existing pinned pointer.
411 // 可重入 pin:版本已经被外层 pin 保护。
412 // 只需增加计数并复用现有的 pinned 指针。
413 if pin_count > 0 {
414 self.pin_count.set(pin_count + 1);
415
416 // Load the pointer that corresponds to our already-pinned version.
417 // Since we're reentrant, we should see the same or newer pointer.
418 // 加载与我们已 pin 版本对应的指针。
419 // 由于是可重入的,我们应该看到相同或更新的指针。
420 let ptr = self.ptr.load(Ordering::Acquire);
421
422 return PinGuard { local: self, ptr };
423 }
424
425 // First pin: need to acquire a version and validate it.
426 // 首次 pin:需要获取版本并验证。
427 loop {
428 let current_version = self.shared.global_version.load(Ordering::Acquire);
429
430 self.slot
431 .active_version
432 .store(current_version, Ordering::Release);
433
434 // Check if our version is still valid (not yet reclaimed).
435 // 检查我们的版本是否仍然有效(尚未被回收)。
436 let min_active = self.shared.min_active_version.load(Ordering::Acquire);
437
438 if current_version >= min_active {
439 break;
440 }
441
442 // Version was reclaimed between our read and store.
443 // Retry with a fresh version.
444 // 版本在我们读取和存储之间被回收了。
445 // 用新版本重试。
446 std::hint::spin_loop();
447 }
448
449 self.pin_count.set(1);
450
451 // Capture the pointer at pin time for snapshot semantics.
452 // 在 pin 时捕获指针以实现快照语义。
453 let ptr = self.ptr.load(Ordering::Acquire);
454
455 PinGuard { local: self, ptr }
456 }
457}
458
459impl<T: 'static> Clone for LocalReader<T> {
460 #[inline]
461 fn clone(&self) -> Self {
462 Self::new(self.shared.clone(), self.ptr.clone())
463 }
464}
465
466/// A guard that keeps the current thread pinned to a version.
467///
468/// `PinGuard` is obtained by calling `LocalReader::pin()`.
469/// It implements `Deref<Target = T>` to allow reading the current value.
470/// It is `!Send` and `!Sync` because it references a `!Sync` `LocalReader`.
471/// Its lifetime is bound to the `LocalReader` it came from.
472///
473/// While a `PinGuard` is held, the thread is considered "active" at a particular version,
474/// and the garbage collector will not reclaim data from that version.
475///
476/// `PinGuard` supports internal cloning via reference counting (increments the pin count),
477/// allowing nested pinning. The thread remains pinned until all cloned guards are dropped.
478///
479/// **Safety**: The `PinGuard` is the mechanism that ensures safe concurrent access to
480/// shared values. Readers must always hold a valid `PinGuard` when accessing
481/// shared data.
482///
483/// 一个保持当前线程被钉住到一个版本的守卫。
484/// `PinGuard` 通过调用 `LocalReader::pin()` 获得。
485/// 它实现了 `Deref<Target = T>`,允许读取当前值。
486/// 它是 `!Send` 和 `!Sync` 的,因为它引用了一个 `!Sync` 的 `LocalReader`。
487/// 它的生命周期被绑定到它来自的 `LocalReader`。
488/// 当 `PinGuard` 被持有时,线程被认为在特定版本"活跃",
489/// 垃圾回收器不会回收该版本的数据。
490/// `PinGuard` 支持通过引用计数的内部克隆(增加 pin 计数),允许嵌套 pinning。
491/// 线程保持被钉住直到所有克隆的守卫被 drop。
492/// **安全性**:`PinGuard` 是确保对值安全并发访问的机制。
493/// 读者在访问共享数据时必须始终持有有效的 `PinGuard`。
494#[must_use]
495pub struct PinGuard<'a, T: 'static> {
496 local: &'a LocalReader<T>,
497 /// The pointer captured at pin time for snapshot semantics.
498 /// 在 pin 时捕获的指针,用于快照语义。
499 ptr: *const T,
500}
501
502impl<'a, T> Deref for PinGuard<'a, T> {
503 type Target = T;
504
505 /// Dereference to access the pinned value.
506 ///
507 /// Returns a reference to the value that was current when this guard was created.
508 /// This provides snapshot semantics - the value won't change during the guard's lifetime.
509 ///
510 /// 解引用以访问被 pin 的值。
511 ///
512 /// 返回对创建此守卫时当前值的引用。
513 /// 这提供了快照语义 - 在守卫的生命周期内值不会改变。
514 #[inline]
515 fn deref(&self) -> &T {
516 // Safety: pin() guarantees pinned_version >= min_active,
517 // and the pointer was captured at pin time.
518 // The value is valid as long as guard is held.
519 // 安全性:pin() 保证 pinned_version >= min_active,
520 // 并且指针在 pin 时被捕获。
521 // 只要 guard 被持有,值就是有效的。
522 unsafe { &*self.ptr }
523 }
524}
525
526impl<'a, T> Clone for PinGuard<'a, T> {
527 /// Clone this guard to create a nested pin.
528 ///
529 /// Cloning increments the pin count, and the thread remains pinned until all cloned guards
530 /// are dropped. This allows multiple scopes to hold pins simultaneously.
531 ///
532 /// 克隆此守卫以创建嵌套 pin。
533 ///
534 /// 克隆会增加 pin 计数,线程保持被钉住直到所有克隆的守卫被 drop。
535 /// 这允许多个作用域同时持有 pin。
536 #[inline]
537 fn clone(&self) -> Self {
538 let pin_count = self.local.pin_count.get();
539
540 assert!(
541 pin_count > 0,
542 "BUG: Cloning a PinGuard in an unpinned state (pin_count = 0). \
543 This indicates incorrect API usage or a library bug."
544 );
545
546 self.local.pin_count.set(pin_count + 1);
547
548 PinGuard {
549 local: self.local,
550 ptr: self.ptr,
551 }
552 }
553}
554
555impl<'a, T> Drop for PinGuard<'a, T> {
556 #[inline]
557 fn drop(&mut self) {
558 let pin_count = self.local.pin_count.get();
559
560 assert!(
561 pin_count > 0,
562 "BUG: Dropping a PinGuard in an unpinned state (pin_count = 0). \
563 This indicates incorrect API usage or a library bug."
564 );
565
566 if pin_count == 1 {
567 self.local
568 .slot
569 .active_version
570 .store(INACTIVE_VERSION, Ordering::Release);
571 }
572
573 self.local.pin_count.set(pin_count - 1);
574 }
575}