halt/lib.rs
1//! Provides worker threads that can be paused, stopped, and resumed.
2//!
3//! # Examples
4//!
5//! ```
6//! use std::time::Duration;
7//! use std::thread;
8//!
9//! // Create a worker thread
10//! let worker = halt::Worker::new();
11//!
12//! // Submit a task and wait for its result
13//! let rx = worker.run(|| 2 + 2).expect("task queued");
14//! let result = rx.recv().expect("task completed");
15//! assert_eq!(result, 4);
16//!
17//! // Pause the worker; tasks will queue but won’t run until resumed
18//! worker.pause();
19//!
20//! // Queue a task while paused
21//! let rx_paused = worker.run(|| {
22//! // Simulate work
23//! thread::sleep(Duration::from_millis(100));
24//! "done while paused"
25//! }).expect("task queued");
26//!
27//! // Give a moment to show it's not running while paused
28//! thread::sleep(Duration::from_millis(50));
29//! assert!(worker.is_paused());
30//!
31//! // Resume to let the queued task run
32//! worker.resume();
33//! let msg = rx_paused.recv().expect("task completed");
34//! assert_eq!(msg, "done while paused");
35//!
36//! // Stop will cause the worker to skip tasks until resumed (or killed)
37//! worker.stop();
38//!
39//! // This task will be skipped because the worker is stopped
40//! let rx_skipped = worker.run(|| 42).expect("task queued");
41//!
42//! // Resume so the worker can process future tasks
43//! worker.resume();
44//!
45//! // The skipped task won't produce a value; recv will block forever.
46//! // Use try_recv or a timeout in real code to handle this:
47//! assert!(rx_skipped.try_recv().is_err());
48//!
49//! // Submit another task that will run now
50//! let rx2 = worker.run(|| "runs after resume").expect("task queued");
51//! assert_eq!(rx2.recv().unwrap(), "runs after resume");
52//! ```
53
54use std::sync::mpsc::{self, Receiver, SendError, Sender};
55use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
56use std::thread::{self, JoinHandle, Thread};
57
58use Signal::{Kill, Pause, Run, Stop};
59
60type Task = Box<dyn FnOnce() + Send>;
61
62/// A worker thread that can be paused, stopped, and resumed.
63#[derive(Debug)]
64pub struct Worker {
65 remote: Remote,
66 sender: Sender<Task>,
67 join_handle: JoinHandle<()>,
68}
69
70impl Drop for Worker {
71 fn drop(&mut self) {
72 self.remote.set(Kill);
73 }
74}
75
76impl Default for Worker {
77 fn default() -> Self {
78 Worker::new()
79 }
80}
81
82impl Worker {
83 /// Creates a new worker that is ready to run tasks.
84 pub fn new() -> Self {
85 let (sender, receiver) = mpsc::channel::<Task>();
86 let waiter = Waiter::default();
87 let remote = waiter.remote();
88
89 let join_handle = thread::spawn(move || {
90 while let Ok(task) = receiver.recv() {
91 let g = waiter.wait_while_paused();
92 match *g {
93 Kill => return,
94 Stop => continue,
95 Run | Pause => drop(g),
96 }
97
98 task()
99 }
100 });
101
102 Worker {
103 remote,
104 sender,
105 join_handle,
106 }
107 }
108
109 /// Run `f` on the worker thread.
110 pub fn run<T>(
111 &self,
112 f: impl FnOnce() -> T + Send + 'static,
113 ) -> Result<Receiver<T>, SendError<Task>>
114 where
115 T: Send + 'static,
116 {
117 let (sender, receiver) = mpsc::sync_channel(1);
118
119 let task = Box::new(move || {
120 let x = f();
121 sender.send(x).ok();
122 });
123
124 self.sender.send(task).map(|_| receiver)
125 }
126
127 /// Returns the thread on which the worker is running.
128 pub fn thread(&self) -> &Thread {
129 self.join_handle.thread()
130 }
131
132 /// Resumes the `Worker` from a paused or stopped state into a running state.
133 pub fn resume(&self) -> bool {
134 self.remote.set(Run)
135 }
136
137 /// Pauses the `Worker`, causing it to sleep until resumed.
138 pub fn pause(&self) -> bool {
139 self.remote.set(Pause)
140 }
141
142 /// Stops the `Worker`, causing it to skip tasks.
143 pub fn stop(&self) -> bool {
144 self.remote.set(Stop)
145 }
146
147 /// Returns `true` if running.
148 pub fn is_running(&self) -> bool {
149 self.remote.is(Run)
150 }
151
152 /// Returns `true` if paused.
153 pub fn is_paused(&self) -> bool {
154 self.remote.is(Pause)
155 }
156
157 /// Returns `true` if stopped.
158 pub fn is_stopped(&self) -> bool {
159 self.remote.is(Stop)
160 }
161}
162
163/// Helper for pausing, stopping, and resuming across threads.
164#[derive(Debug, Default)]
165struct Waiter {
166 state: Arc<State>,
167}
168
169impl Waiter {
170 /// Returns a remote that allows for pausing, stopping, and resuming.
171 fn remote(&self) -> Remote {
172 Remote {
173 state: Arc::downgrade(&self.state),
174 }
175 }
176
177 /// Sleeps the current thread until resumed or stopped.
178 fn wait_while_paused(&self) -> MutexGuard<'_, Signal> {
179 let guard = self.state.signal.lock().unwrap();
180 self.state
181 .condvar
182 .wait_while(guard, |status| *status == Pause)
183 .unwrap()
184 }
185}
186
187/// A remote that allows for pausing, stopping, and resuming from another thread.
188#[derive(Debug)]
189struct Remote {
190 state: Weak<State>,
191}
192
193impl Remote {
194 fn set(&self, signal: Signal) -> bool {
195 self.state.upgrade().is_some_and(|state| state.set(signal))
196 }
197
198 fn is(&self, signal: Signal) -> bool {
199 self.state.upgrade().is_some_and(|state| state.is(signal))
200 }
201}
202
203#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
204enum Signal {
205 #[default]
206 Run,
207 Pause,
208 Stop,
209 Kill,
210}
211
212#[derive(Debug, Default)]
213struct State {
214 signal: Mutex<Signal>,
215 condvar: Condvar,
216}
217
218impl State {
219 fn set(&self, signal: Signal) -> bool {
220 let Ok(mut guard) = self.signal.lock() else {
221 return false;
222 };
223
224 *guard = signal;
225 self.condvar.notify_all();
226 true
227 }
228
229 fn is(&self, signal: Signal) -> bool {
230 self.signal.lock().is_ok_and(|guard| *guard == signal)
231 }
232}