almost_enough/
sync_stopper.rs

1//! Synchronized cancellation with memory ordering guarantees.
2//!
3//! [`SyncStopper`] uses Release/Acquire ordering to ensure memory synchronization
4//! between the cancelling thread and threads that observe the cancellation.
5//!
6//! # When to Use
7//!
8//! Use `SyncStopper` when you need to ensure that writes made before `cancel()`
9//! are visible to readers after they see `should_stop() == true`.
10//!
11//! ```rust
12//! use almost_enough::{SyncStopper, Stop};
13//! use std::sync::atomic::{AtomicUsize, Ordering};
14//!
15//! static SHARED_DATA: AtomicUsize = AtomicUsize::new(0);
16//!
17//! let stop = SyncStopper::new();
18//!
19//! // Thread A: producer
20//! SHARED_DATA.store(42, Ordering::Relaxed);
21//! stop.cancel();  // Release: flushes SHARED_DATA write
22//!
23//! // Thread B: consumer (same thread here for demo)
24//! if stop.should_stop() {  // Acquire: syncs with Release
25//!     // GUARANTEED to see SHARED_DATA == 42
26//!     let value = SHARED_DATA.load(Ordering::Relaxed);
27//!     assert_eq!(value, 42);
28//! }
29//! ```
30//!
31//! # When NOT to Use
32//!
33//! If you don't need synchronization guarantees (most cancellation use cases),
34//! use [`Stopper`](crate::Stopper) instead - it's slightly faster on
35//! weakly-ordered architectures (ARM, etc.).
36//!
37//! # Memory Ordering
38//!
39//! | Operation | Ordering | Effect |
40//! |-----------|----------|--------|
41//! | `cancel()` | Release | Flushes prior writes |
42//! | `is_cancelled()` | Acquire | Syncs with Release |
43//! | `should_stop()` | Acquire | Syncs with Release |
44//! | `check()` | Acquire | Syncs with Release |
45
46use alloc::sync::Arc;
47use core::sync::atomic::{AtomicBool, Ordering};
48
49use crate::{Stop, StopReason};
50
51/// A cancellation primitive with Release/Acquire memory ordering.
52///
53/// Unlike [`Stopper`](crate::Stopper) which uses Relaxed ordering,
54/// `SyncStopper` guarantees that all writes before `cancel()` are visible
55/// to any clone that subsequently observes `should_stop() == true`.
56///
57/// # Example
58///
59/// ```rust
60/// use almost_enough::{SyncStopper, Stop};
61///
62/// let stop = SyncStopper::new();
63/// let stop2 = stop.clone();
64///
65/// // In producer thread:
66/// // ... write shared data ...
67/// stop.cancel();  // Release barrier
68///
69/// // In consumer thread:
70/// if stop2.should_stop() {  // Acquire barrier
71///     // Safe to read shared data written before cancel()
72/// }
73/// ```
74///
75/// # Performance
76///
77/// On x86/x64, Release/Acquire has negligible overhead (strong memory model).
78/// On ARM and other weakly-ordered architectures, there's a small cost for
79/// the memory barriers. Use [`Stopper`](crate::Stopper) if you don't
80/// need the synchronization guarantees.
81#[derive(Debug, Clone)]
82pub struct SyncStopper {
83    cancelled: Arc<AtomicBool>,
84}
85
86impl SyncStopper {
87    /// Create a new synchronized stopper.
88    #[inline]
89    pub fn new() -> Self {
90        Self {
91            cancelled: Arc::new(AtomicBool::new(false)),
92        }
93    }
94
95    /// Create a stopper that is already cancelled.
96    #[inline]
97    pub fn cancelled() -> Self {
98        Self {
99            cancelled: Arc::new(AtomicBool::new(true)),
100        }
101    }
102
103    /// Cancel with Release ordering.
104    ///
105    /// All memory writes before this call are guaranteed to be visible
106    /// to any clone that subsequently observes `should_stop() == true`.
107    #[inline]
108    pub fn cancel(&self) {
109        self.cancelled.store(true, Ordering::Release);
110    }
111
112    /// Check if cancelled with Acquire ordering.
113    ///
114    /// If this returns `true`, all memory writes that happened before
115    /// the corresponding `cancel()` call are guaranteed to be visible.
116    #[inline]
117    pub fn is_cancelled(&self) -> bool {
118        self.cancelled.load(Ordering::Acquire)
119    }
120}
121
122impl Default for SyncStopper {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl Stop for SyncStopper {
129    #[inline]
130    fn check(&self) -> Result<(), StopReason> {
131        if self.cancelled.load(Ordering::Acquire) {
132            Err(StopReason::Cancelled)
133        } else {
134            Ok(())
135        }
136    }
137
138    #[inline]
139    fn should_stop(&self) -> bool {
140        self.cancelled.load(Ordering::Acquire)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn sync_stopper_basic() {
150        let stop = SyncStopper::new();
151        assert!(!stop.is_cancelled());
152        assert!(!stop.should_stop());
153        assert!(stop.check().is_ok());
154
155        stop.cancel();
156
157        assert!(stop.is_cancelled());
158        assert!(stop.should_stop());
159        assert_eq!(stop.check(), Err(StopReason::Cancelled));
160    }
161
162    #[test]
163    fn sync_stopper_cancelled_constructor() {
164        let stop = SyncStopper::cancelled();
165        assert!(stop.is_cancelled());
166        assert!(stop.should_stop());
167    }
168
169    #[test]
170    fn sync_stopper_clone_shares_state() {
171        let stop1 = SyncStopper::new();
172        let stop2 = stop1.clone();
173
174        assert!(!stop1.should_stop());
175        assert!(!stop2.should_stop());
176
177        stop2.cancel();
178
179        assert!(stop1.should_stop());
180        assert!(stop2.should_stop());
181    }
182
183    #[test]
184    fn sync_stopper_is_default() {
185        let stop: SyncStopper = Default::default();
186        assert!(!stop.is_cancelled());
187    }
188
189    #[test]
190    fn sync_stopper_is_send_sync() {
191        fn assert_send_sync<T: Send + Sync>() {}
192        assert_send_sync::<SyncStopper>();
193    }
194
195    #[test]
196    fn cancel_is_idempotent() {
197        let stop = SyncStopper::new();
198        stop.cancel();
199        stop.cancel();
200        stop.cancel();
201        assert!(stop.is_cancelled());
202    }
203
204    #[cfg(feature = "std")]
205    #[test]
206    fn sync_ordering_guarantees() {
207        use std::sync::atomic::AtomicUsize;
208
209        let stop = SyncStopper::new();
210        let data = AtomicUsize::new(0);
211
212        // This test verifies the ordering semantics compile correctly.
213        // Actual ordering verification would require more complex testing
214        // with tools like loom or ThreadSanitizer.
215
216        // Producer
217        data.store(42, Ordering::Relaxed);
218        stop.cancel(); // Release
219
220        // Consumer (same thread for simplicity)
221        if stop.should_stop() {
222            // Acquire
223            let value = data.load(Ordering::Relaxed);
224            assert_eq!(value, 42);
225        }
226    }
227}