use std::sync::mpsc::{self, Receiver, SendError, Sender};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak};
use std::thread::{self, JoinHandle, Thread};
use Signal::{Kill, Pause, Run, Stop};
type Task = Box<dyn FnOnce() + Send>;
#[derive(Debug)]
pub struct Worker {
remote: Remote,
sender: Sender<Task>,
join_handle: JoinHandle<()>,
}
impl Drop for Worker {
fn drop(&mut self) {
self.remote.set(Kill);
}
}
impl Default for Worker {
fn default() -> Self {
Worker::new()
}
}
impl Worker {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel::<Task>();
let waiter = Waiter::default();
let remote = waiter.remote();
let join_handle = thread::spawn(move || {
while let Ok(task) = receiver.recv() {
let g = waiter.wait_while_paused();
match *g {
Kill => return,
Stop => continue,
Run | Pause => drop(g),
}
task()
}
});
Worker {
remote,
sender,
join_handle,
}
}
pub fn run<T>(
&self,
f: impl FnOnce() -> T + Send + 'static,
) -> Result<Receiver<T>, SendError<Task>>
where
T: Send + 'static,
{
let (sender, receiver) = mpsc::sync_channel(1);
let task = Box::new(move || {
let x = f();
sender.send(x).ok();
});
self.sender.send(task).map(|_| receiver)
}
pub fn thread(&self) -> &Thread {
self.join_handle.thread()
}
pub fn resume(&self) -> bool {
self.remote.set(Run)
}
pub fn pause(&self) -> bool {
self.remote.set(Pause)
}
pub fn stop(&self) -> bool {
self.remote.set(Stop)
}
pub fn is_running(&self) -> bool {
self.remote.is(Run)
}
pub fn is_paused(&self) -> bool {
self.remote.is(Pause)
}
pub fn is_stopped(&self) -> bool {
self.remote.is(Stop)
}
}
#[derive(Debug, Default)]
struct Waiter {
state: Arc<State>,
}
impl Waiter {
fn remote(&self) -> Remote {
Remote {
state: Arc::downgrade(&self.state),
}
}
fn wait_while_paused(&self) -> MutexGuard<'_, Signal> {
let guard = self.state.signal.lock().unwrap();
self.state
.condvar
.wait_while(guard, |status| *status == Pause)
.unwrap()
}
}
#[derive(Debug)]
struct Remote {
state: Weak<State>,
}
impl Remote {
fn set(&self, signal: Signal) -> bool {
self.state.upgrade().is_some_and(|state| state.set(signal))
}
fn is(&self, signal: Signal) -> bool {
self.state.upgrade().is_some_and(|state| state.is(signal))
}
}
#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
enum Signal {
#[default]
Run,
Pause,
Stop,
Kill,
}
#[derive(Debug, Default)]
struct State {
signal: Mutex<Signal>,
condvar: Condvar,
}
impl State {
fn set(&self, signal: Signal) -> bool {
let Ok(mut guard) = self.signal.lock() else {
return false;
};
*guard = signal;
self.condvar.notify_all();
true
}
fn is(&self, signal: Signal) -> bool {
self.signal.lock().is_ok_and(|guard| *guard == signal)
}
}