halt/
lib.rs

1//! Provides worker threads that can be paused, stopped, and resumed.
2//!
3//! # Examples
4//!
5//! ```
6//! use std::time::Duration;
7//! use std::thread;
8//!
9//! // Create a worker thread
10//! let worker = halt::Worker::new();
11//!
12//! // Submit a task and wait for its result
13//! let rx = worker.run(|| 2 + 2).expect("task queued");
14//! let result = rx.recv().expect("task completed");
15//! assert_eq!(result, 4);
16//!
17//! // Pause the worker; tasks will queue but won’t run until resumed
18//! worker.pause();
19//!
20//! // Queue a task while paused
21//! let rx_paused = worker.run(|| {
22//!     // Simulate work
23//!     thread::sleep(Duration::from_millis(100));
24//!     "done while paused"
25//! }).expect("task queued");
26//!
27//! // Give a moment to show it's not running while paused
28//! thread::sleep(Duration::from_millis(50));
29//! assert!(worker.is_paused());
30//!
31//! // Resume to let the queued task run
32//! worker.resume();
33//! let msg = rx_paused.recv().expect("task completed");
34//! assert_eq!(msg, "done while paused");
35//!
36//! // Stop will cause the worker to skip tasks until resumed (or killed)
37//! worker.stop();
38//!
39//! // This task will be skipped because the worker is stopped
40//! let rx_skipped = worker.run(|| 42).expect("task queued");
41//!
42//! // Resume so the worker can process future tasks
43//! worker.resume();
44//!
45//! // The skipped task won't produce a value; recv will block forever.
46//! // Use try_recv or a timeout in real code to handle this:
47//! assert!(rx_skipped.try_recv().is_err());
48//!
49//! // Submit another task that will run now
50//! let rx2 = worker.run(|| "runs after resume").expect("task queued");
51//! assert_eq!(rx2.recv().unwrap(), "runs after resume");
52//! ```
53
54use std::sync::mpsc::{self, Receiver, SendError, Sender};
55use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
56use std::thread::{self, JoinHandle, Thread};
57
58use Signal::{Kill, Pause, Run, Stop};
59
60type Task = Box<dyn FnOnce() + Send>;
61
62/// A worker thread that can be paused, stopped, and resumed.
63#[derive(Debug)]
64pub struct Worker {
65    remote: Remote,
66    sender: Sender<Task>,
67    join_handle: JoinHandle<()>,
68}
69
70impl Drop for Worker {
71    fn drop(&mut self) {
72        self.remote.set(Kill);
73    }
74}
75
76impl Default for Worker {
77    fn default() -> Self {
78        Worker::new()
79    }
80}
81
82impl Worker {
83    /// Creates a new worker that is ready to run tasks.
84    pub fn new() -> Self {
85        let (sender, receiver) = mpsc::channel::<Task>();
86        let waiter = Waiter::default();
87        let remote = waiter.remote();
88
89        let join_handle = thread::spawn(move || {
90            while let Ok(task) = receiver.recv() {
91                let g = waiter.wait_while_paused();
92                match *g {
93                    Kill => return,
94                    Stop => continue,
95                    Run | Pause => drop(g),
96                }
97
98                task()
99            }
100        });
101
102        Worker {
103            remote,
104            sender,
105            join_handle,
106        }
107    }
108
109    /// Run `f` on the worker thread.
110    pub fn run<T>(
111        &self,
112        f: impl FnOnce() -> T + Send + 'static,
113    ) -> Result<Receiver<T>, SendError<Task>>
114    where
115        T: Send + 'static,
116    {
117        let (sender, receiver) = mpsc::sync_channel(1);
118
119        let task = Box::new(move || {
120            let x = f();
121            sender.send(x).ok();
122        });
123
124        self.sender.send(task).map(|_| receiver)
125    }
126
127    /// Returns the thread on which the worker is running.
128    pub fn thread(&self) -> &Thread {
129        self.join_handle.thread()
130    }
131
132    /// Resumes the `Worker` from a paused or stopped state into a running state.
133    pub fn resume(&self) -> bool {
134        self.remote.set(Run)
135    }
136
137    /// Pauses the `Worker`, causing it to sleep until resumed.
138    pub fn pause(&self) -> bool {
139        self.remote.set(Pause)
140    }
141
142    /// Stops the `Worker`, causing it to skip tasks.
143    pub fn stop(&self) -> bool {
144        self.remote.set(Stop)
145    }
146
147    /// Returns `true` if running.
148    pub fn is_running(&self) -> bool {
149        self.remote.is(Run)
150    }
151
152    /// Returns `true` if paused.
153    pub fn is_paused(&self) -> bool {
154        self.remote.is(Pause)
155    }
156
157    /// Returns `true` if stopped.
158    pub fn is_stopped(&self) -> bool {
159        self.remote.is(Stop)
160    }
161}
162
163/// Helper for pausing, stopping, and resuming across threads.
164#[derive(Debug, Default)]
165struct Waiter {
166    state: Arc<State>,
167}
168
169impl Waiter {
170    /// Returns a remote that allows for pausing, stopping, and resuming.
171    fn remote(&self) -> Remote {
172        Remote {
173            state: Arc::downgrade(&self.state),
174        }
175    }
176
177    /// Sleeps the current thread until resumed or stopped.
178    fn wait_while_paused(&self) -> MutexGuard<'_, Signal> {
179        let guard = self.state.signal.lock().unwrap();
180        self.state
181            .condvar
182            .wait_while(guard, |status| *status == Pause)
183            .unwrap()
184    }
185}
186
187/// A remote that allows for pausing, stopping, and resuming from another thread.
188#[derive(Debug)]
189struct Remote {
190    state: Weak<State>,
191}
192
193impl Remote {
194    fn set(&self, signal: Signal) -> bool {
195        self.state.upgrade().is_some_and(|state| state.set(signal))
196    }
197
198    fn is(&self, signal: Signal) -> bool {
199        self.state.upgrade().is_some_and(|state| state.is(signal))
200    }
201}
202
203#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
204enum Signal {
205    #[default]
206    Run,
207    Pause,
208    Stop,
209    Kill,
210}
211
212#[derive(Debug, Default)]
213struct State {
214    signal: Mutex<Signal>,
215    condvar: Condvar,
216}
217
218impl State {
219    fn set(&self, signal: Signal) -> bool {
220        let Ok(mut guard) = self.signal.lock() else {
221            return false;
222        };
223
224        *guard = signal;
225        self.condvar.notify_all();
226        true
227    }
228
229    fn is(&self, signal: Signal) -> bool {
230        self.signal.lock().is_ok_and(|guard| *guard == signal)
231    }
232}