1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::executor::context::ExecutorContext;
use crate::executor::stream::{OCLStream, OCLStreamSender};
use crate::utils::result::OCLStreamResult;
use crate::utils::shared_buffer::SharedBuffer;
use ocl::{OclPrm, ProQue};
use std::sync::Arc;
use std::thread;
pub mod context;
pub mod stream;
#[derive(Clone)]
pub struct OCLStreamExecutor {
pro_que: ProQue,
concurrency: usize,
}
impl OCLStreamExecutor {
pub fn new(pro_que: ProQue) -> Self {
Self {
pro_que,
concurrency: 1,
}
}
pub fn set_concurrency(&mut self, mut num_tasks: usize) {
if num_tasks == 0 {
num_tasks = num_cpus::get();
}
self.concurrency = num_tasks;
}
pub fn execute_bounded<F, T>(&self, size: usize, func: F) -> OCLStream<T>
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (stream, sender) = stream::bounded(size);
self.execute(func, sender);
stream
}
pub fn pro_que(&self) -> &ProQue {
&self.pro_que
}
pub fn execute_unbounded<F, T>(&self, func: F) -> OCLStream<T>
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let (stream, sender) = stream::unbounded();
self.execute(func, sender);
stream
}
fn execute<F, T>(&self, func: F, sender: OCLStreamSender<T>)
where
F: Fn(ExecutorContext<T>) -> OCLStreamResult<()> + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let func = Arc::new(func);
for task_id in 0..(self.concurrency) {
let func = Arc::clone(&func);
let context = self.build_context(task_id, sender.clone());
thread::Builder::new()
.name(format!("ocl-{}", task_id))
.spawn(move || {
let sender = context.sender().clone();
if let Err(e) = func(context) {
if let Err(e) = sender.err(e) {
panic!("Failed to forward error to receiver: {}", e);
}
}
})
.expect("Failed to spawn ocl thread");
}
}
pub fn create_shared_buffer<T>(&self, len: usize) -> ocl::Result<SharedBuffer<T>>
where
T: OclPrm,
{
let buffer = self.pro_que.buffer_builder().len(len).build()?;
Ok(SharedBuffer::new(buffer))
}
fn build_context<T>(&self, task_id: usize, sender: OCLStreamSender<T>) -> ExecutorContext<T>
where
T: Send + Sync,
{
ExecutorContext::new(self.pro_que.clone(), task_id, sender)
}
}