1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::executor::context::ExecutorContext;
use crate::executor::stream::{OCLStream, OCLStreamSender};
use crate::utils::result::OCLStreamResult;
use ocl::ProQue;
use scheduled_thread_pool::ScheduledThreadPool;
use std::sync::Arc;
pub mod context;
pub mod stream;
#[derive(Clone)]
pub struct OCLStreamExecutor {
pro_que: ProQue,
pool: Arc<ScheduledThreadPool>,
concurrency: usize,
}
impl OCLStreamExecutor {
pub fn new(pro_que: ProQue) -> Self {
Self {
pro_que,
pool: Arc::new(ScheduledThreadPool::new(num_cpus::get())),
concurrency: 1,
}
}
pub fn set_concurrency(&mut self, mut num_tasks: usize) {
if num_tasks == 0 {
num_tasks = num_cpus::get();
}
self.concurrency = num_tasks;
}
pub fn set_pool(&mut self, pool: ScheduledThreadPool) {
self.pool = Arc::new(pool);
}
pub fn execute_bounded<F, T>(&self, size: usize, func: F) -> OCLStream<T>
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (stream, sender) = stream::bounded(size);
self.execute(func, sender);
stream
}
pub fn execute_unbounded<F, T>(&self, func: F) -> OCLStream<T>
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (stream, sender) = stream::unbounded();
self.execute(func, sender);
stream
}
fn execute<F, T>(&self, func: F, sender: OCLStreamSender<T>)
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let func = Arc::new(func);
for task_id in 0..(self.concurrency) {
let func = Arc::clone(&func);
let context = self.build_context(task_id, sender.clone());
self.pool.execute(move || {
let sender2 = context.sender().clone();
if let Err(e) = func(context) {
sender2.err(e).unwrap();
}
});
}
}
fn build_context<T>(&self, task_id: usize, sender: OCLStreamSender<T>) -> ExecutorContext<T>
where
T: Send + Sync,
{
ExecutorContext::new(self.pro_que.clone(), task_id, sender)
}
}