almost_enough/sync_stopper.rs
1//! Synchronized cancellation with memory ordering guarantees.
2//!
3//! [`SyncStopper`] uses Release/Acquire ordering to ensure memory synchronization
4//! between the cancelling thread and threads that observe the cancellation.
5//!
6//! # When to Use
7//!
8//! Use `SyncStopper` when you need to ensure that writes made before `cancel()`
9//! are visible to readers after they see `should_stop() == true`.
10//!
11//! ```rust
12//! use almost_enough::{SyncStopper, Stop};
13//! use std::sync::atomic::{AtomicUsize, Ordering};
14//!
15//! static SHARED_DATA: AtomicUsize = AtomicUsize::new(0);
16//!
17//! let stop = SyncStopper::new();
18//!
19//! // Thread A: producer
20//! SHARED_DATA.store(42, Ordering::Relaxed);
21//! stop.cancel(); // Release: flushes SHARED_DATA write
22//!
23//! // Thread B: consumer (same thread here for demo)
24//! if stop.should_stop() { // Acquire: syncs with Release
25//! // GUARANTEED to see SHARED_DATA == 42
26//! let value = SHARED_DATA.load(Ordering::Relaxed);
27//! assert_eq!(value, 42);
28//! }
29//! ```
30//!
31//! # When NOT to Use
32//!
33//! If you don't need synchronization guarantees (most cancellation use cases),
34//! use [`Stopper`](crate::Stopper) instead - it's slightly faster on
35//! weakly-ordered architectures (ARM, etc.).
36//!
37//! # Memory Ordering
38//!
39//! | Operation | Ordering | Effect |
40//! |-----------|----------|--------|
41//! | `cancel()` | Release | Flushes prior writes |
42//! | `is_cancelled()` | Acquire | Syncs with Release |
43//! | `should_stop()` | Acquire | Syncs with Release |
44//! | `check()` | Acquire | Syncs with Release |
45
46use alloc::sync::Arc;
47use core::sync::atomic::{AtomicBool, Ordering};
48
49use crate::{Stop, StopReason};
50
51/// A cancellation primitive with Release/Acquire memory ordering.
52///
53/// Unlike [`Stopper`](crate::Stopper) which uses Relaxed ordering,
54/// `SyncStopper` guarantees that all writes before `cancel()` are visible
55/// to any clone that subsequently observes `should_stop() == true`.
56///
57/// # Example
58///
59/// ```rust
60/// use almost_enough::{SyncStopper, Stop};
61///
62/// let stop = SyncStopper::new();
63/// let stop2 = stop.clone();
64///
65/// // In producer thread:
66/// // ... write shared data ...
67/// stop.cancel(); // Release barrier
68///
69/// // In consumer thread:
70/// if stop2.should_stop() { // Acquire barrier
71/// // Safe to read shared data written before cancel()
72/// }
73/// ```
74///
75/// # Performance
76///
77/// On x86/x64, Release/Acquire has negligible overhead (strong memory model).
78/// On ARM and other weakly-ordered architectures, there's a small cost for
79/// the memory barriers. Use [`Stopper`](crate::Stopper) if you don't
80/// need the synchronization guarantees.
81#[derive(Debug, Clone)]
82pub struct SyncStopper {
83 cancelled: Arc<AtomicBool>,
84}
85
86impl SyncStopper {
87 /// Create a new synchronized stopper.
88 #[inline]
89 pub fn new() -> Self {
90 Self {
91 cancelled: Arc::new(AtomicBool::new(false)),
92 }
93 }
94
95 /// Create a stopper that is already cancelled.
96 #[inline]
97 pub fn cancelled() -> Self {
98 Self {
99 cancelled: Arc::new(AtomicBool::new(true)),
100 }
101 }
102
103 /// Cancel with Release ordering.
104 ///
105 /// All memory writes before this call are guaranteed to be visible
106 /// to any clone that subsequently observes `should_stop() == true`.
107 #[inline]
108 pub fn cancel(&self) {
109 self.cancelled.store(true, Ordering::Release);
110 }
111
112 /// Check if cancelled with Acquire ordering.
113 ///
114 /// If this returns `true`, all memory writes that happened before
115 /// the corresponding `cancel()` call are guaranteed to be visible.
116 #[inline]
117 pub fn is_cancelled(&self) -> bool {
118 self.cancelled.load(Ordering::Acquire)
119 }
120}
121
122impl Default for SyncStopper {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl Stop for SyncStopper {
129 #[inline]
130 fn check(&self) -> Result<(), StopReason> {
131 if self.cancelled.load(Ordering::Acquire) {
132 Err(StopReason::Cancelled)
133 } else {
134 Ok(())
135 }
136 }
137
138 #[inline]
139 fn should_stop(&self) -> bool {
140 self.cancelled.load(Ordering::Acquire)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn sync_stopper_basic() {
150 let stop = SyncStopper::new();
151 assert!(!stop.is_cancelled());
152 assert!(!stop.should_stop());
153 assert!(stop.check().is_ok());
154
155 stop.cancel();
156
157 assert!(stop.is_cancelled());
158 assert!(stop.should_stop());
159 assert_eq!(stop.check(), Err(StopReason::Cancelled));
160 }
161
162 #[test]
163 fn sync_stopper_cancelled_constructor() {
164 let stop = SyncStopper::cancelled();
165 assert!(stop.is_cancelled());
166 assert!(stop.should_stop());
167 }
168
169 #[test]
170 fn sync_stopper_clone_shares_state() {
171 let stop1 = SyncStopper::new();
172 let stop2 = stop1.clone();
173
174 assert!(!stop1.should_stop());
175 assert!(!stop2.should_stop());
176
177 stop2.cancel();
178
179 assert!(stop1.should_stop());
180 assert!(stop2.should_stop());
181 }
182
183 #[test]
184 fn sync_stopper_is_default() {
185 let stop: SyncStopper = Default::default();
186 assert!(!stop.is_cancelled());
187 }
188
189 #[test]
190 fn sync_stopper_is_send_sync() {
191 fn assert_send_sync<T: Send + Sync>() {}
192 assert_send_sync::<SyncStopper>();
193 }
194
195 #[test]
196 fn cancel_is_idempotent() {
197 let stop = SyncStopper::new();
198 stop.cancel();
199 stop.cancel();
200 stop.cancel();
201 assert!(stop.is_cancelled());
202 }
203
204 #[cfg(feature = "std")]
205 #[test]
206 fn sync_ordering_guarantees() {
207 use std::sync::atomic::AtomicUsize;
208
209 let stop = SyncStopper::new();
210 let data = AtomicUsize::new(0);
211
212 // This test verifies the ordering semantics compile correctly.
213 // Actual ordering verification would require more complex testing
214 // with tools like loom or ThreadSanitizer.
215
216 // Producer
217 data.store(42, Ordering::Relaxed);
218 stop.cancel(); // Release
219
220 // Consumer (same thread for simplicity)
221 if stop.should_stop() {
222 // Acquire
223 let value = data.load(Ordering::Relaxed);
224 assert_eq!(value, 42);
225 }
226 }
227}