almost_enough/sync_stopper.rs
1//! Synchronized cancellation with memory ordering guarantees.
2//!
3//! [`SyncStopper`] uses Release/Acquire ordering to ensure memory synchronization
4//! between the cancelling thread and threads that observe the cancellation.
5//!
6//! # When to Use
7//!
8//! Use `SyncStopper` when you need to ensure that writes made before `cancel()`
9//! are visible to readers after they see `should_stop() == true`.
10//!
11//! ```rust
12//! use almost_enough::{SyncStopper, Stop};
13//! use std::sync::atomic::{AtomicUsize, Ordering};
14//!
15//! static SHARED_DATA: AtomicUsize = AtomicUsize::new(0);
16//!
17//! let stop = SyncStopper::new();
18//!
19//! // Thread A: producer
20//! SHARED_DATA.store(42, Ordering::Relaxed);
21//! stop.cancel(); // Release: flushes SHARED_DATA write
22//!
23//! // Thread B: consumer (same thread here for demo)
24//! if stop.should_stop() { // Acquire: syncs with Release
25//! // GUARANTEED to see SHARED_DATA == 42
26//! let value = SHARED_DATA.load(Ordering::Relaxed);
27//! assert_eq!(value, 42);
28//! }
29//! ```
30//!
31//! # When NOT to Use
32//!
33//! If you don't need synchronization guarantees (most cancellation use cases),
34//! use [`Stopper`](crate::Stopper) instead - it's slightly faster on
35//! weakly-ordered architectures (ARM, etc.).
36//!
37//! # Memory Ordering
38//!
39//! | Operation | Ordering | Effect |
40//! |-----------|----------|--------|
41//! | `cancel()` | Release | Flushes prior writes |
42//! | `is_cancelled()` | Acquire | Syncs with Release |
43//! | `should_stop()` | Acquire | Syncs with Release |
44//! | `check()` | Acquire | Syncs with Release |
45
46use alloc::sync::Arc;
47use core::sync::atomic::{AtomicBool, Ordering};
48
49use crate::{Stop, StopReason};
50
51/// Inner state for [`SyncStopper`] — implements [`Stop`] with Acquire ordering.
52pub(crate) struct SyncStopperInner {
53 cancelled: AtomicBool,
54}
55
56impl Stop for SyncStopperInner {
57 #[inline]
58 fn check(&self) -> Result<(), StopReason> {
59 if self.cancelled.load(Ordering::Acquire) {
60 Err(StopReason::Cancelled)
61 } else {
62 Ok(())
63 }
64 }
65
66 #[inline]
67 fn should_stop(&self) -> bool {
68 self.cancelled.load(Ordering::Acquire)
69 }
70}
71
72/// A cancellation primitive with Release/Acquire memory ordering.
73///
74/// Unlike [`Stopper`](crate::Stopper) which uses Relaxed ordering,
75/// `SyncStopper` guarantees that all writes before `cancel()` are visible
76/// to any clone that subsequently observes `should_stop() == true`.
77///
78/// Converts to [`StopToken`](crate::StopToken) via `From`/`Into` with zero
79/// overhead — the existing `Arc` is reused, not double-wrapped.
80///
81/// # Performance
82///
83/// On x86/x64, Release/Acquire has negligible overhead (strong memory model).
84/// On ARM and other weakly-ordered architectures, there's a small cost for
85/// the memory barriers. Use [`Stopper`](crate::Stopper) if you don't
86/// need the synchronization guarantees.
87#[derive(Debug, Clone)]
88pub struct SyncStopper {
89 pub(crate) inner: Arc<SyncStopperInner>,
90}
91
92impl SyncStopper {
93 /// Create a new synchronized stopper.
94 #[inline]
95 pub fn new() -> Self {
96 Self {
97 inner: Arc::new(SyncStopperInner {
98 cancelled: AtomicBool::new(false),
99 }),
100 }
101 }
102
103 /// Create a stopper that is already cancelled.
104 #[inline]
105 pub fn cancelled() -> Self {
106 Self {
107 inner: Arc::new(SyncStopperInner {
108 cancelled: AtomicBool::new(true),
109 }),
110 }
111 }
112
113 /// Cancel with Release ordering.
114 ///
115 /// All memory writes before this call are guaranteed to be visible
116 /// to any clone that subsequently observes `should_stop() == true`.
117 #[inline]
118 pub fn cancel(&self) {
119 self.inner.cancelled.store(true, Ordering::Release);
120 }
121
122 /// Check if cancelled with Acquire ordering.
123 ///
124 /// If this returns `true`, all memory writes that happened before
125 /// the corresponding `cancel()` call are guaranteed to be visible.
126 #[inline]
127 pub fn is_cancelled(&self) -> bool {
128 self.inner.cancelled.load(Ordering::Acquire)
129 }
130}
131
132impl Default for SyncStopper {
133 fn default() -> Self {
134 Self::new()
135 }
136}
137
138impl Stop for SyncStopper {
139 #[inline]
140 fn check(&self) -> Result<(), StopReason> {
141 self.inner.check()
142 }
143
144 #[inline]
145 fn should_stop(&self) -> bool {
146 self.inner.should_stop()
147 }
148}
149
150impl core::fmt::Debug for SyncStopperInner {
151 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
152 f.debug_struct("SyncStopperInner")
153 .field("cancelled", &self.cancelled.load(Ordering::Relaxed))
154 .finish()
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161
162 #[test]
163 fn sync_stopper_basic() {
164 let stop = SyncStopper::new();
165 assert!(!stop.is_cancelled());
166 assert!(!stop.should_stop());
167 assert!(stop.check().is_ok());
168
169 stop.cancel();
170
171 assert!(stop.is_cancelled());
172 assert!(stop.should_stop());
173 assert_eq!(stop.check(), Err(StopReason::Cancelled));
174 }
175
176 #[test]
177 fn sync_stopper_cancelled_constructor() {
178 let stop = SyncStopper::cancelled();
179 assert!(stop.is_cancelled());
180 assert!(stop.should_stop());
181 }
182
183 #[test]
184 fn sync_stopper_clone_shares_state() {
185 let stop1 = SyncStopper::new();
186 let stop2 = stop1.clone();
187
188 stop2.cancel();
189
190 assert!(stop1.should_stop());
191 assert!(stop2.should_stop());
192 }
193
194 #[test]
195 fn sync_stopper_is_send_sync() {
196 fn assert_send_sync<T: Send + Sync>() {}
197 assert_send_sync::<SyncStopper>();
198 }
199
200 #[test]
201 fn sync_stopper_is_default() {
202 let stop: SyncStopper = Default::default();
203 assert!(!stop.is_cancelled());
204 }
205}