1mod task_builder;
2mod kernel_arg;
3pub mod pipeline_task;
4use std::os::raw::c_void;
5use std::sync::Arc;
6
7use crate::{
8 async_executor::{task_builder::TaskBuilder, pipeline_task::PipelineBuilder},
9 cl_types::{
10 cl_buffer::ClBuffer,
11 cl_event::ClEvent,
12 cl_command_queue::{ClCommandQueue, command_queue_parameters::{CommandQueueProperties, Version20}},
13 cl_context::ClContext,
14 cl_device::ClDevice,
15 cl_platform::ClPlatform,
16 cl_kernel::ClKernel,
17 cl_device::opencl_version::OpenCLVersion,
18 cl_program::{ClProgram, Builded, NotBuilded, program_parameters::ProgramParameters},
19 cl_image::{ClImage, image_desc::ClImageDesc, image_formats::ClImageFormats},
20 cl_svm_buffer::ClSvmBuffer,
21 memory_flags::MemoryFlags,
22 },
23 error::ClError
24};
25
26#[cfg(feature = "CL_VERSION_1_1")]
34pub struct AsyncExecutor {
35 context: Arc<ClContext>,
36 queues: Vec<ClCommandQueue>,
37 weights: Vec<u64>,
38 profiling_enabled: bool,
39 device_versions: Vec<OpenCLVersion>,
40 devices: Vec<ClDevice>,
41}
42
43#[cfg(feature = "CL_VERSION_1_1")]
44impl AsyncExecutor {
45 pub fn new_best_platform() -> Result<Self, ClError> {
56 Self::new_best_platform_with_options(false)
57 }
58
59 pub fn new_best_platform_with_options(profiling_enabled: bool) -> Result<Self, ClError> {
64 let platforms = ClPlatform::get_all()?;
65
66 let scores: Vec<u64> = platforms
67 .iter()
68 .map(|f| Self::measure_platform_capacity(f))
69 .collect::<Result<Vec<u64>, ClError>>()?;
70
71 let best_score_index = {
72 let mut idx: usize = 0;
73 let mut max: u64 = 0;
74 for (i, v) in scores.iter().enumerate() {
75 if *v > max {
76 idx = i;
77 max = *v
78 }
79 }
80 idx
81 };
82
83 let devices = match platforms.get(best_score_index) {
84 Some(plat) => plat.get_all_devices()?,
85 None => {
86 return Err(ClError::Wrapper(
87 crate::error::wrapper_error::WrapperError::PlatformsNotFound,
88 ));
89 }
90 };
91
92 let context = Arc::new(ClContext::new(&devices)?);
93 let mut queues = Vec::new();
94 let mut weights = Vec::new();
95 let mut device_versions = Vec::new();
96
97 for device in &devices {
98 let version = device.get_opencl_version();
99 let mut supports_out_of_order = false;
100
101 let queue = if version >= OpenCLVersion::V2_0 {
102 if let Ok(host_props) = device.get_queue_on_host_properties() {
103 supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
104 }
105
106 let properties = CommandQueueProperties::<Version20>::new()
107 .set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
108 .get_properties();
109 ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
110 } else {
111 let mut properties = 0;
112 if profiling_enabled {
113 properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
114 }
115 #[allow(deprecated)]
116 ClCommandQueue::create_command_queue(&context, device, properties)?
117 };
118
119 queues.push(queue);
120 weights.push(Self::measure_device_capacity(device)?);
121 device_versions.push(version);
122 }
123
124 let executor = Self {
125 context,
126 queues,
127 weights,
128 device_versions,
129 profiling_enabled,
130 devices: devices.into_iter().map(|d| d.clone()).collect(),
131 };
132
133 Ok(executor)
134 }
135
136 pub fn new_all_platforms() -> Result<Self, ClError> {
137 let platforms = ClPlatform::get_all()?;
138 Self::new_from_platforms(&platforms)
139 }
140
141 pub fn new_from_platforms(platforms: &[ClPlatform]) -> Result<Self, ClError> {
142 let mut all_devices = Vec::new();
143 for platform in platforms {
144 let devices = platform.get_all_devices()?;
145 all_devices.extend(devices);
146 }
147 Self::new_from_devices(&all_devices)
148 }
149
150 pub fn new_from_devices(devices: &[ClDevice]) -> Result<Self, ClError> {
151 Self::new_from_devices_with_options(devices, false)
152 }
153
154 pub fn new_from_devices_with_options(devices: &[ClDevice], profiling_enabled: bool) -> Result<Self, ClError> {
155 let devices_vec = devices.to_vec();
156 let context = Arc::new(ClContext::new(&devices_vec)?);
157 let mut queues = Vec::new();
158 let mut weights = Vec::new();
159 let mut device_versions = Vec::new();
160
161 for device in devices {
162 let version = device.get_opencl_version();
163 let mut supports_out_of_order = false;
164
165 let queue = if version >= OpenCLVersion::V2_0 {
166 if let Ok(host_props) = device.get_queue_on_host_properties() {
167 supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
168 }
169
170 let properties = CommandQueueProperties::<Version20>::new()
171 .set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
172 .get_properties();
173 ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
174 } else {
175 let mut properties = 0;
176 if profiling_enabled {
177 properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
178 }
179 #[allow(deprecated)]
180 ClCommandQueue::create_command_queue(&context, device, properties)?
181 };
182
183 queues.push(queue);
184 weights.push(Self::measure_device_capacity(device)?);
185 device_versions.push(version);
186 }
187
188 Ok(Self {
189 context,
190 queues,
191 weights,
192 device_versions,
193 profiling_enabled,
194 devices: devices_vec,
195 })
196 }
197
198 pub fn is_profiling_enabled(&self) -> bool {
199 self.profiling_enabled
200 }
201
202 pub fn get_context(&self) -> Arc<ClContext> {
203 self.context.clone()
204 }
205
206 pub fn get_device_versions(&self) -> &[OpenCLVersion] {
207 &self.device_versions
208 }
209
210 pub fn get_devices(&self) -> Result<Vec<ClDevice>, ClError> {
211 Ok(self.devices.iter().map(|d| d.clone()).collect())
212 }
213
214 pub fn get_queues(&self) -> &[ClCommandQueue] {
215 &self.queues
216 }
217
218 pub fn create_task<'a>(&'a self, kernel: &'a ClKernel) -> TaskBuilder<'a> {
228 TaskBuilder::new(self, kernel)
229 }
230
231 pub fn create_pipeline(&self) -> PipelineBuilder<'_> {
236 PipelineBuilder::new(self)
237 }
238
239 pub fn build_program(&self, source: String, options: Option<&str>) -> Result<ClProgram<Builded>, ClError> {
251 let unbuilded = ClProgram::<NotBuilded>::from_src(&self.context, source)?;
252 let devices = self.context.get_devices()?;
253
254 let params = match options {
255 Some(opt) => ProgramParameters::default().custom(opt).get_parameters(),
256 None => ProgramParameters::default().get_parameters(),
257 };
258
259 unbuilded.build(¶ms, &devices)
260 }
261
262 pub fn compile_or_binary(
267 &self,
268 src_path: &str,
269 binary_dest_folder: &str,
270 options: Option<&str>,
271 ) -> Result<ClProgram<Builded>, ClError> {
272 use std::fs;
273 use std::path::Path;
274 use std::io::Read;
275 use crate::error::wrapper_error::WrapperError;
276
277 let path = Path::new(src_path);
278 let file_stem = path.file_stem()
279 .and_then(|s| s.to_str())
280 .ok_or(ClError::Wrapper(WrapperError::FailedToConvertStrToCString))?;
281
282 let devices = self.context.get_devices()?;
283
284 let mut binaries: Vec<Vec<u8>> = Vec::new();
285 let mut use_binaries = true;
286
287 for (i, device) in devices.iter().enumerate() {
288 let device_name = device.get_name()?.replace(" ", "_");
289 let bin_filename = format!("{}_{}_{}.bin", file_stem, device_name, i);
290 let bin_path = Path::new(binary_dest_folder).join(bin_filename);
291
292 if bin_path.exists() {
293 match fs::read(&bin_path) {
294 Ok(content) => binaries.push(content),
295 Err(_) => {
296 use_binaries = false;
297 break;
298 }
299 }
300 } else {
301 use_binaries = false;
302 break;
303 }
304 }
305
306 if use_binaries && binaries.len() == devices.len() {
307 let binary_slices: Vec<&[u8]> = binaries.iter().map(|b| b.as_slice()).collect();
308
309 match ClProgram::<NotBuilded>::from_binary(&self.context, &devices, &binary_slices) {
310 Ok(program) => {
311 let params = match options {
312 Some(opt) => ProgramParameters::default().custom(opt).get_parameters(),
313 None => ProgramParameters::default().get_parameters(),
314 };
315
316 match program.build(¶ms, &devices) {
317 Ok(built) => return Ok(built),
318 Err(_) => {
319 }
321 }
322 },
323 Err(_) => {
324 }
326 }
327 }
328
329 let mut src_content = String::new();
331 fs::File::open(src_path)
332 .map_err(|_| ClError::Wrapper(WrapperError::FileIOError))?
333 .read_to_string(&mut src_content)
334 .map_err(|_| ClError::Wrapper(WrapperError::FileIOError))?;
335
336 let built_program = self.build_program(src_content, options)?;
337
338 let _ = built_program.save_binary(binary_dest_folder, file_stem);
340
341 Ok(built_program)
342 }
343
344 pub fn create_kernel(&self, program: &ClProgram<Builded>, name: &str) -> Result<ClKernel, ClError> {
347 ClKernel::new(program, name)
348 }
349
350 pub fn create_buffer(&self, flags: &[MemoryFlags], size: usize, host_ptr: *mut c_void) -> Result<ClBuffer, ClError> {
354 ClBuffer::new(&self.context, &flags.to_vec(), size, host_ptr)
355 }
356
357 #[cfg(feature = "CL_VERSION_1_2")]
360 pub fn create_image(
361 &self,
362 flags: &[MemoryFlags],
363 format: &ClImageFormats,
364 desc: &ClImageDesc,
365 host_ptr: *mut c_void
366 ) -> Result<ClImage, ClError> {
367 ClImage::new(&self.context, &flags.to_vec(), format, desc, host_ptr)
368 }
369
370 #[cfg(feature = "CL_VERSION_2_0")]
373 pub fn create_svm_buffer<T>(&self, flags: &[MemoryFlags], len: usize) -> Result<ClSvmBuffer<T>, ClError> {
374 ClSvmBuffer::<T>::new(&self.context, &flags.to_vec(), len, 0)
375 }
376
377 pub async fn read_buffer<T: Sized>(
380 &self,
381 buffer: &ClBuffer,
382 host_memory: &mut [T],
383 ) -> Result<ClEvent, ClError> {
384 let queue = self.get_optimal_queue();
385 queue.enqueue_read_buffer(buffer, None, host_memory, None).await
386 }
387
388 pub async fn write_buffer<T: Sized>(
391 &self,
392 buffer: &ClBuffer,
393 host_memory: &mut [T],
394 ) -> Result<ClEvent, ClError> {
395 let queue = self.get_optimal_queue();
396 let size = host_memory.len() * std::mem::size_of::<T>();
397 queue.write_buffer(buffer, host_memory.as_mut_ptr() as *mut c_void, 0, size, None).await
398 }
399
400 #[cfg(feature = "CL_VERSION_1_2")]
402 pub async fn read_image<T: Sized>(
403 &self,
404 image: &ClImage,
405 host_memory: &mut [T],
406 origin: [usize; 3],
407 region: [usize; 3],
408 ) -> Result<ClEvent, ClError> {
409 let queue = self.get_optimal_queue();
410 queue.read_image_raw(
411 image,
412 origin,
413 region,
414 0,
415 0,
416 host_memory.as_mut_ptr() as *mut c_void,
417 None
418 ).await
419 }
420
421 #[cfg(feature = "CL_VERSION_1_2")]
423 pub async fn write_image<T: Sized>(
424 &self,
425 image: &ClImage,
426 host_memory: &mut [T],
427 origin: [usize; 3],
428 region: [usize; 3],
429 ) -> Result<ClEvent, ClError> {
430 let queue = self.get_optimal_queue();
431 queue.write_image_raw(
432 image,
433 origin,
434 region,
435 0,
436 0,
437 host_memory.as_mut_ptr() as *mut c_void,
438 None
439 ).await
440 }
441
442 fn get_optimal_queue(&self) -> &ClCommandQueue {
448 let mut max_weight = 0;
449 let mut idx = 0;
450 for (i, &weight) in self.weights.iter().enumerate() {
451 if weight > max_weight {
452 max_weight = weight;
453 idx = i;
454 }
455 }
456 &self.queues[idx]
457 }
458
459 fn measure_platform_capacity(platform: &ClPlatform) -> Result<u64, ClError> {
460 let mut score: u64 = 0;
461
462 let devices = platform.get_all_devices()?;
463 for device in &devices {
464 score += Self::measure_device_capacity(device)?;
465 }
466 Ok(score)
467 }
468
469 fn measure_device_capacity(device: &ClDevice) -> Result<u64, ClError> {
470 let compute_units = device.get_max_compute_units()?;
471 let clock_frequency = device.get_max_clock_frequency()?;
472 let memory = device.get_global_mem_size()? / (1024 * 1024);
473 let device_type = device.get_device_type()?;
474
475 let mut score = ((compute_units as u64 * clock_frequency as u64) / 100) + (memory / 10);
476
477 if (device_type & cl3::device::CL_DEVICE_TYPE_GPU) != 0 {
481 score += 1_000_000;
482 } else if (device_type & cl3::device::CL_DEVICE_TYPE_ACCELERATOR) != 0 {
483 score += 500_000;
484 }
485
486 Ok(score)
487 }
488}
489
490unsafe impl Sync for AsyncExecutor {}
491unsafe impl Send for AsyncExecutor {}