1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use std::collections::HashMap;
use std::sync::mpsc::{RecvTimeoutError, SendError, Sender, channel};
use std::sync::{Arc, Mutex};
use std::thread::spawn;
pub struct SimqChannel<A, B> {
tx: Sender<(usize, A)>,
results: Arc<Mutex<HashMap<usize, B>>>,
}
impl<A, B: Clone> SimqChannel<A, B> {
/// Send data to the worker, returning the id of the request.
pub fn send(&self, params: A) -> Result<usize, SendError<(usize, A)>> {
static ID_COUNTER: Mutex<usize> = Mutex::new(0);
let next = ID_COUNTER
.lock()
.ok()
.and_then(|mut lock| {
let t = lock.clone();
*lock += 1;
Some(t)
})
.unwrap();
self.tx.send((next, params))?;
Ok(next)
}
/// Get the result of a request, provided the request id. \
/// The return type must be clonable.
pub fn get(&self, res_id: usize) -> Option<B> {
self.results.lock().ok()?.remove(&res_id)
}
}
/// Job Builder
pub struct SimqBuilder<A, B> {
func: fn(A) -> B,
max_threads: usize,
opt_and_then: Option<fn(&B)>,
opt_and_then_mut: Option<fn(&mut B)>,
}
impl<A: Send + 'static, B: Send + 'static> SimqBuilder<A, B> {
/// Register a job, designating the maximum number of parallel threads you wish to run the task
pub fn register(max_threads: usize, func: fn(A) -> B) -> Self {
Self {
func,
max_threads,
opt_and_then: None,
opt_and_then_mut: None,
}
}
/// ### CANNOT BE USED IN CONJUNCTION WITH `and_then_mut`
/// After producing a result, do something with the result
/// This is good for additional work, like logging or storing to a database
pub fn and_then(self, and_then: fn(&B)) -> Self {
Self {
opt_and_then: Some(and_then),
opt_and_then_mut: None,
..self
}
}
/// ### CANNOT BE USED IN CONJUNCTION WITH `and_then`
/// After producing a result, do something with the result (mutable)
/// Same as the and_then callback, but if you need a mutable reference for some reason
pub fn and_then_mut(self, and_then_mut: fn(&mut B)) -> Self {
Self {
opt_and_then_mut: Some(and_then_mut),
opt_and_then: None,
..self
}
}
/// Launch the worker
/// The worker will continue accepting and processing data as long as the WorkerChannel is not dropped.
pub fn spawn(self) -> SimqChannel<A, B> {
// Spawn the worker
let (tx, rx) = channel::<(usize, A)>();
let result_map = Arc::new(Mutex::new(HashMap::new()));
let job_result_map = result_map.clone();
spawn(move || {
let timeout_dur = std::time::Duration::from_millis(100);
let sem = Arc::new(Mutex::new(self.max_threads));
loop {
match rx.recv_timeout(timeout_dur) {
Ok((res_id, params)) => {
let thread_result_map = job_result_map.clone();
loop {
if let Ok(mut lock) = sem.lock()
&& *lock > 0
{
*lock -= 1;
break;
}
std::thread::sleep(timeout_dur);
}
let thread_sem = sem.clone();
spawn(move || {
let mut res = (self.func)(params);
if let Some(and_then) = self.opt_and_then {
and_then(&res);
} else if let Some(and_then_mut) = self.opt_and_then_mut {
and_then_mut(&mut res);
}
if let Ok(mut lock) = thread_result_map.lock() {
lock.insert(res_id, res);
}
if let Ok(mut lock) = thread_sem.lock() {
*lock += 1;
}
});
}
Err(e) => {
if e == RecvTimeoutError::Timeout {
continue;
} else {
// Sender disconnected; the SimqChannel was dropped, so we should stop this worker
break;
}
}
}
}
});
SimqChannel {
tx,
results: result_map,
}
}
}