1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use std::thread;
use std::future::Future;
use crossbeam::channel::{self, Sender};
use crate::*;
type TaskFunc<ThreadState, Result> = dyn FnOnce(&mut ThreadState) -> Result + Send + 'static;
type BoxedTaskFunc<ThreadState, Result> = Box<TaskFunc<ThreadState, Result>>;
pub struct WorkerThread<ThreadState, Result> {
sender: Sender<Task<BoxedTaskFunc<ThreadState, Result>, Result>>,
}
impl<ThreadState, Result> Clone for WorkerThread<ThreadState, Result> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<ThreadState, Result> WorkerThread<ThreadState, Result>
where ThreadState: Default + Send + 'static,
Result: Send + 'static {
pub fn spawn() -> Self {
Self::spawn_with(Default::default())
}
}
impl<ThreadState, Result> WorkerThread<ThreadState, Result>
where ThreadState: Send + 'static,
Result: Send + 'static {
pub fn spawn_with(mut data: ThreadState) -> Self {
let (input_tx, input_rx) = channel::unbounded();
thread::spawn(move || {
loop {
if let Ok(task) = input_rx.recv() {
let task: Task<BoxedTaskFunc<ThreadState, Result>, Result> = task;
let result = (task.func)(&mut data);
task.future.complete(result);
} else {
return;
}
}
});
Self {
sender: input_tx,
}
}
pub async fn work_on<F>(&self, func: F) -> Result
where F: FnOnce(&mut ThreadState) -> Result + Send + 'static {
self.work_on_boxed_inner(Box::new(func)).await
}
pub async fn work_on_boxed(&self, func: BoxedTaskFunc<ThreadState, Result>) -> Result {
self.work_on_boxed_inner(func).await
}
fn work_on_boxed_inner(&self, func: BoxedTaskFunc<ThreadState, Result>) -> impl Future<Output = Result> {
let future = MutexFuture::new();
let future_ = future.clone();
self.sender.send(Task { func, future }).unwrap();
future_
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_worker_thread() {
futures::executor::block_on(test());
}
async fn test() {
let worker = WorkerThread::spawn();
let add_three = worker.work_on(|num: &mut i64| {
*num += 3;
*num
});
let mult_two = worker.work_on(|num: &mut i64| {
*num *= 2;
*num
});
let result1 = add_three.await;
let result2 = mult_two.await;
let future3 = worker.work_on(|num: &mut i64| {
*num *= -1;
*num
});
let result3 = future3.await;
assert_eq!(result1, 3);
assert_eq!(result2, 6);
assert_eq!(result3, -6);
}
}