libfreemkv/halt.rs
1//! One-bit cooperative cancellation flag.
2//!
3//! `Halt` is a clonable token wrapping `Arc<AtomicBool>`. Pass clones into
4//! every long-running loop; the loop polls `is_cancelled()` and bails out
5//! cleanly. Calling `cancel()` from any clone flips the shared flag, and
6//! every other clone observes it on its next poll.
7//!
8//! Why: `Ordering::Relaxed` is sufficient on both load and store because
9//! this flag is purely advisory — no other memory operations piggyback on
10//! it for happens-before ordering. Callers that need to publish data
11//! across threads do so via channels or other synchronization, not via
12//! this bit.
13
14use std::sync::Arc;
15use std::sync::atomic::{AtomicBool, Ordering};
16
17/// Clonable, infallible cooperative-cancellation token.
18///
19/// Clones share the same underlying flag. `cancel()` is one-way; there is
20/// no `reset()` by design — construct a fresh `Halt` for a fresh
21/// operation.
22///
23/// Construct with [`Halt::new`] (or [`Halt::default`]). The `Default`
24/// impl forwards to `new()` — both produce a fresh, uncancelled token.
25/// The pair exists because clippy's `new_without_default` lint requires
26/// `Default` whenever a public `new()` is present, even when the two
27/// would do exactly the same thing.
28#[derive(Clone, Debug)]
29pub struct Halt(Arc<AtomicBool>);
30
31impl Halt {
32 /// Construct a fresh, uncancelled token.
33 pub fn new() -> Self {
34 Self(Arc::new(AtomicBool::new(false)))
35 }
36
37 /// Wrap an existing `Arc<AtomicBool>` as a `Halt`. Useful as a
38 /// bridge during the 0.18 deprecation window: callers that already
39 /// hold an `Arc<AtomicBool>` (e.g. `Drive::halt_flag()`, the
40 /// deprecated `DiscStream::set_halt`) can adopt the new token API
41 /// without changing the underlying flag.
42 ///
43 /// Cancelling either side flips the same bit — the wrapping `Halt`
44 /// and the original `Arc` are two views over one shared flag.
45 pub fn from_arc(flag: Arc<AtomicBool>) -> Self {
46 Self(flag)
47 }
48
49 /// Borrow the underlying `Arc<AtomicBool>`. Used at boundaries with
50 /// pre-`Halt` APIs that still take an `Arc<AtomicBool>` directly
51 /// (`CopyOptions::halt`, the deprecated `DiscStream::set_halt`).
52 /// Round 3 deletes those boundaries and this accessor with them.
53 pub fn as_arc(&self) -> &Arc<AtomicBool> {
54 &self.0
55 }
56
57 /// Flip the shared flag to cancelled. Idempotent.
58 pub fn cancel(&self) {
59 self.0.store(true, Ordering::Relaxed);
60 }
61
62 /// Read the shared flag.
63 pub fn is_cancelled(&self) -> bool {
64 self.0.load(Ordering::Relaxed)
65 }
66}
67
68impl Default for Halt {
69 fn default() -> Self {
70 Self::new()
71 }
72}
73
74/// Shared poll interval for halt-aware loops.
75///
76/// `bounded_syscall` checks the cancellation flag and the deadline
77/// every [`POLL_INTERVAL`] while blocked on a worker; the same
78/// cadence governs `Pipeline::send_with_halt`'s `try_send` retry.
79/// 250 ms is the sweet spot between responsiveness (operator presses
80/// Stop, sees it take effect within ~quarter-second) and waste
81/// (atomic load + clock read is cheap but not free at thousands of
82/// hertz).
83///
84/// Centralised here so the half-dozen halt-polling loops across `io`
85/// can't drift apart silently.
86pub const POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(250);
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91
92 #[test]
93 fn fresh_is_not_cancelled() {
94 let h = Halt::new();
95 assert!(!h.is_cancelled());
96 }
97
98 #[test]
99 fn cancel_flips_state() {
100 let h = Halt::new();
101 assert!(!h.is_cancelled());
102 h.cancel();
103 assert!(h.is_cancelled());
104 }
105
106 #[test]
107 fn cancel_is_idempotent() {
108 let h = Halt::new();
109 h.cancel();
110 h.cancel();
111 assert!(h.is_cancelled());
112 }
113
114 #[test]
115 fn clone_shares_state() {
116 let original = Halt::new();
117 let cloned = original.clone();
118 assert!(!original.is_cancelled());
119 assert!(!cloned.is_cancelled());
120
121 // Cancel via the clone; the original observes it.
122 cloned.cancel();
123 assert!(original.is_cancelled());
124 assert!(cloned.is_cancelled());
125 }
126
127 #[test]
128 fn clone_shares_state_reverse_direction() {
129 let original = Halt::new();
130 let cloned = original.clone();
131
132 // Cancel via the original; the clone observes it.
133 original.cancel();
134 assert!(cloned.is_cancelled());
135 }
136
137 #[test]
138 fn clone_shares_state_across_threads() {
139 let h = Halt::new();
140 let h2 = h.clone();
141 let handle = std::thread::spawn(move || {
142 h2.cancel();
143 });
144 handle.join().unwrap();
145 assert!(h.is_cancelled());
146 }
147
148 #[test]
149 fn from_arc_shares_state() {
150 // The 0.18 deprecation-window bridge: a Halt built from an
151 // existing Arc<AtomicBool> must be a *view* over the same bit,
152 // not a fresh copy. Cancelling either side flips both.
153 let arc = Arc::new(AtomicBool::new(false));
154 let halt = Halt::from_arc(arc.clone());
155 assert!(!halt.is_cancelled());
156 assert!(!arc.load(Ordering::Relaxed));
157
158 // Cancel via the wrapping Halt; the original Arc observes it.
159 halt.cancel();
160 assert!(arc.load(Ordering::Relaxed));
161
162 // Conversely: flip the Arc directly; the Halt view observes it.
163 let arc2 = Arc::new(AtomicBool::new(false));
164 let halt2 = Halt::from_arc(arc2.clone());
165 arc2.store(true, Ordering::Relaxed);
166 assert!(halt2.is_cancelled());
167 }
168
169 #[test]
170 fn as_arc_returns_backing_flag() {
171 // `as_arc()` must hand back the *same* Arc, not a clone of a
172 // different bit. Verified by writing through the borrowed Arc
173 // and observing through the Halt.
174 let halt = Halt::new();
175 let arc = halt.as_arc().clone();
176 assert!(!halt.is_cancelled());
177 arc.store(true, Ordering::Relaxed);
178 assert!(halt.is_cancelled());
179 }
180}