Skip to main content

libfreemkv/
halt.rs

1//! One-bit cooperative cancellation flag.
2//!
3//! `Halt` is a clonable token wrapping `Arc<AtomicBool>`. Pass clones into
4//! every long-running loop; the loop polls `is_cancelled()` and bails out
5//! cleanly. Calling `cancel()` from any clone flips the shared flag, and
6//! every other clone observes it on its next poll.
7//!
8//! Why: `Ordering::Relaxed` is sufficient on both load and store because
9//! this flag is purely advisory — no other memory operations piggyback on
10//! it for happens-before ordering. Callers that need to publish data
11//! across threads do so via channels or other synchronization, not via
12//! this bit.
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16
17/// Clonable, infallible cooperative-cancellation token.
18///
19/// Clones share the same underlying flag. `cancel()` is one-way; there is
20/// no `reset()` by design — construct a fresh `Halt` for a fresh
21/// operation.
22///
23/// Construct with [`Halt::new`] (or [`Halt::default`]). The `Default`
24/// impl forwards to `new()` — both produce a fresh, uncancelled token.
25/// The pair exists because clippy's `new_without_default` lint requires
26/// `Default` whenever a public `new()` is present, even when the two
27/// would do exactly the same thing.
28#[derive(Clone, Debug)]
29pub struct Halt(Arc<AtomicBool>);
30
31impl Halt {
32    /// Construct a fresh, uncancelled token.
33    pub fn new() -> Self {
34        Self(Arc::new(AtomicBool::new(false)))
35    }
36
37    /// Wrap an existing `Arc<AtomicBool>` as a `Halt`. Useful as a
38    /// bridge during the 0.18 deprecation window: callers that already
39    /// hold an `Arc<AtomicBool>` (e.g. `Drive::halt_flag()`, the
40    /// deprecated `DiscStream::set_halt`) can adopt the new token API
41    /// without changing the underlying flag.
42    ///
43    /// Cancelling either side flips the same bit — the wrapping `Halt`
44    /// and the original `Arc` are two views over one shared flag.
45    pub fn from_arc(flag: Arc<AtomicBool>) -> Self {
46        Self(flag)
47    }
48
49    /// Borrow the underlying `Arc<AtomicBool>`. Used at boundaries with
50    /// pre-`Halt` APIs that still take an `Arc<AtomicBool>` directly
51    /// (`CopyOptions::halt`, the deprecated `DiscStream::set_halt`).
52    /// Round 3 deletes those boundaries and this accessor with them.
53    pub fn as_arc(&self) -> &Arc<AtomicBool> {
54        &self.0
55    }
56
57    /// Flip the shared flag to cancelled. Idempotent.
58    pub fn cancel(&self) {
59        self.0.store(true, Ordering::Relaxed);
60    }
61
62    /// Read the shared flag.
63    pub fn is_cancelled(&self) -> bool {
64        self.0.load(Ordering::Relaxed)
65    }
66}
67
68impl Default for Halt {
69    fn default() -> Self {
70        Self::new()
71    }
72}
73
74/// Shared poll interval for halt-aware loops.
75///
76/// `bounded_syscall` checks the cancellation flag and the deadline
77/// every [`POLL_INTERVAL`] while blocked on a worker; the same
78/// cadence governs `Pipeline::send_with_halt`'s `try_send` retry.
79/// 250 ms is the sweet spot between responsiveness (operator presses
80/// Stop, sees it take effect within ~quarter-second) and waste
81/// (atomic load + clock read is cheap but not free at thousands of
82/// hertz).
83///
84/// Centralised here so the half-dozen halt-polling loops across `io`
85/// can't drift apart silently.
86pub const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(250);
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91
92    #[test]
93    fn fresh_is_not_cancelled() {
94        let h = Halt::new();
95        assert!(!h.is_cancelled());
96    }
97
98    #[test]
99    fn cancel_flips_state() {
100        let h = Halt::new();
101        assert!(!h.is_cancelled());
102        h.cancel();
103        assert!(h.is_cancelled());
104    }
105
106    #[test]
107    fn cancel_is_idempotent() {
108        let h = Halt::new();
109        h.cancel();
110        h.cancel();
111        assert!(h.is_cancelled());
112    }
113
114    #[test]
115    fn clone_shares_state() {
116        let original = Halt::new();
117        let cloned = original.clone();
118        assert!(!original.is_cancelled());
119        assert!(!cloned.is_cancelled());
120
121        // Cancel via the clone; the original observes it.
122        cloned.cancel();
123        assert!(original.is_cancelled());
124        assert!(cloned.is_cancelled());
125    }
126
127    #[test]
128    fn clone_shares_state_reverse_direction() {
129        let original = Halt::new();
130        let cloned = original.clone();
131
132        // Cancel via the original; the clone observes it.
133        original.cancel();
134        assert!(cloned.is_cancelled());
135    }
136
137    #[test]
138    fn clone_shares_state_across_threads() {
139        let h = Halt::new();
140        let h2 = h.clone();
141        let handle = std::thread::spawn(move || {
142            h2.cancel();
143        });
144        handle.join().unwrap();
145        assert!(h.is_cancelled());
146    }
147
148    #[test]
149    fn from_arc_shares_state() {
150        // The 0.18 deprecation-window bridge: a Halt built from an
151        // existing Arc<AtomicBool> must be a *view* over the same bit,
152        // not a fresh copy. Cancelling either side flips both.
153        let arc = Arc::new(AtomicBool::new(false));
154        let halt = Halt::from_arc(arc.clone());
155        assert!(!halt.is_cancelled());
156        assert!(!arc.load(Ordering::Relaxed));
157
158        // Cancel via the wrapping Halt; the original Arc observes it.
159        halt.cancel();
160        assert!(arc.load(Ordering::Relaxed));
161
162        // Conversely: flip the Arc directly; the Halt view observes it.
163        let arc2 = Arc::new(AtomicBool::new(false));
164        let halt2 = Halt::from_arc(arc2.clone());
165        arc2.store(true, Ordering::Relaxed);
166        assert!(halt2.is_cancelled());
167    }
168
169    #[test]
170    fn as_arc_returns_backing_flag() {
171        // `as_arc()` must hand back the *same* Arc, not a clone of a
172        // different bit. Verified by writing through the borrowed Arc
173        // and observing through the Halt.
174        let halt = Halt::new();
175        let arc = halt.as_arc().clone();
176        assert!(!halt.is_cancelled());
177        arc.store(true, Ordering::Relaxed);
178        assert!(halt.is_cancelled());
179    }
180}