1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use ThreadState;
use workers::WorkerClosure;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, SendError, Sender};
use std::thread;
pub struct SingleWorker<T: 'static + Send, P: Clone + Send> {
parameters: P,
f: Arc<Box<WorkerClosure<T, P, Output = ()>>>,
receiver: Arc<Mutex<Receiver<T>>>,
sender: Mutex<Sender<T>>,
alive: Arc<AtomicBool>,
}
impl<T: 'static + Debug + Send, P: 'static + Clone + Send> SingleWorker<T, P> {
pub fn new(parameters: P, f: Box<WorkerClosure<T, P, Output = ()>>) -> SingleWorker<T, P> {
let (sender, reciever) = channel::<T>();
let worker = SingleWorker {
parameters: parameters,
f: Arc::new(f),
receiver: Arc::new(Mutex::new(reciever)),
sender: Mutex::new(sender),
alive: Arc::new(AtomicBool::new(true)),
};
SingleWorker::spawn_thread(&worker);
worker
}
fn is_alive(&self) -> bool {
self.alive.clone().load(Ordering::Relaxed)
}
fn spawn_thread(worker: &SingleWorker<T, P>) {
let mut alive = worker.alive.clone();
let f = worker.f.clone();
let receiver = worker.receiver.clone();
let parameters = worker.parameters.clone();
thread::spawn(move || {
let state = ThreadState { alive: &mut alive };
state.set_alive();
let lock = match receiver.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
loop {
match lock.recv() {
Ok(value) => f(¶meters, value),
Err(_) => {
thread::yield_now();
}
};
}
});
while !worker.is_alive() {
thread::yield_now();
}
}
pub fn work_with(&self, msg: T) -> Result<(), SendError<T>> {
let alive = self.is_alive();
if !alive {
SingleWorker::spawn_thread(self);
}
let lock = match self.sender.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
lock.send(msg)
}
}