Skip to main content

easy_async_opencl3/async_executor/
mod.rs

1mod task_builder;
2mod kernel_arg;
3pub mod pipeline_task;
4use std::os::raw::c_void;
5use std::sync::Arc;
6
7use crate::{
8    async_executor::{task_builder::TaskBuilder, pipeline_task::PipelineBuilder}, 
9    cl_types::{
10        cl_buffer::ClBuffer,
11        cl_event::ClEvent,
12        cl_command_queue::{ClCommandQueue, command_queue_parameters::{CommandQueueProperties, Version20}},
13        cl_context::ClContext, 
14        cl_device::ClDevice, 
15        cl_platform::ClPlatform,
16        cl_kernel::ClKernel,
17        cl_device::opencl_version::OpenCLVersion,
18        cl_program::{ClProgram, Builded, NotBuilded, program_parameters::ProgramParameters},
19        cl_image::{ClImage, image_desc::ClImageDesc, image_formats::ClImageFormats},
20        cl_svm_buffer::ClSvmBuffer,
21        memory_flags::MemoryFlags,
22    }, 
23    error::ClError
24};
25
26/// # AsyncExecutor
27/// 
28/// The `AsyncExecutor` is the central engine of this library. Its job is to simplify
29/// all the "dirty work" of OpenCL: finding the best graphics card, creating the context,
30/// managing command queues, and distributing work intelligently.
31///
32/// Think of it as an orchestra conductor that decides which musicians (GPUs) play each part.
33#[cfg(feature = "CL_VERSION_1_1")]
34pub struct AsyncExecutor {
35    context: Arc<ClContext>,
36    queues: Vec<ClCommandQueue>,
37    weights: Vec<u64>,
38    profiling_enabled: bool,
39    device_versions: Vec<OpenCLVersion>,
40    devices: Vec<ClDevice>,
41}
42
43#[cfg(feature = "CL_VERSION_1_1")]
44impl AsyncExecutor {
45    //
46    //
47    // Static
48    //
49    //
50
51    /// Creates an executor by automatically selecting the best available platform.
52    /// 
53    /// It will search among all cards (NVIDIA, AMD, Intel) and choose the one with
54    /// the most computing power and memory.
55    pub fn new_best_platform() -> Result<Self, ClError> {
56        Self::new_best_platform_with_options(false)
57    }
58
59    /// Creates an executor with the best platform, allowing profiling to be enabled.
60    /// 
61    /// If `profiling_enabled` is true, you can measure exactly how many nanoseconds
62    /// each kernel takes to execute on the card.
63    pub fn new_best_platform_with_options(profiling_enabled: bool) -> Result<Self, ClError> {
64        let platforms = ClPlatform::get_all()?;
65
66        let scores: Vec<u64> = platforms
67            .iter()
68            .map(|f| Self::measure_platform_capacity(f))
69            .collect::<Result<Vec<u64>, ClError>>()?;
70
71        let best_score_index = {
72            let mut idx: usize = 0;
73            let mut max: u64 = 0;
74            for (i, v) in scores.iter().enumerate() {
75                if *v > max {
76                    idx = i;
77                    max = *v
78                }
79            }
80            idx
81        };
82
83        let devices = match platforms.get(best_score_index) {
84            Some(plat) => plat.get_all_devices()?,
85            None => {
86                return Err(ClError::Wrapper(
87                    crate::error::wrapper_error::WrapperError::PlatformsNotFound,
88                ));
89            }
90        };
91
92        let context = Arc::new(ClContext::new(&devices)?);
93        let mut queues = Vec::new();
94        let mut weights = Vec::new();
95        let mut device_versions = Vec::new();
96
97        for device in &devices {
98            let version = device.get_opencl_version();
99            let mut supports_out_of_order = false;
100
101            let queue = if version >= OpenCLVersion::V2_0 {
102                if let Ok(host_props) = device.get_queue_on_host_properties() {
103                    supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
104                }
105                
106                let properties = CommandQueueProperties::<Version20>::new()
107                    .set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
108                    .get_properties();
109                ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
110            } else {
111                let mut properties = 0;
112                if profiling_enabled {
113                    properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
114                }
115                #[allow(deprecated)]
116                ClCommandQueue::create_command_queue(&context, device, properties)?
117            };
118            
119            queues.push(queue);
120            weights.push(Self::measure_device_capacity(device)?);
121            device_versions.push(version);
122        }
123
124        let executor = Self {
125            context,
126            queues,
127            weights,
128            device_versions,
129            profiling_enabled,
130            devices: devices.into_iter().map(|d| d.clone()).collect(),
131        };
132
133        Ok(executor)
134    }
135
136    pub fn new_all_platforms() -> Result<Self, ClError> {
137        let platforms = ClPlatform::get_all()?;
138        Self::new_from_platforms(&platforms)
139    }
140
141    pub fn new_from_platforms(platforms: &[ClPlatform]) -> Result<Self, ClError> {
142        let mut all_devices = Vec::new();
143        for platform in platforms {
144            let devices = platform.get_all_devices()?;
145            all_devices.extend(devices);
146        }
147        Self::new_from_devices(&all_devices)
148    }
149
150    pub fn new_from_devices(devices: &[ClDevice]) -> Result<Self, ClError> {
151        Self::new_from_devices_with_options(devices, false)
152    }
153
154    pub fn new_from_devices_with_options(devices: &[ClDevice], profiling_enabled: bool) -> Result<Self, ClError> {
155        let devices_vec = devices.to_vec();
156        let context = Arc::new(ClContext::new(&devices_vec)?);
157        let mut queues = Vec::new();
158        let mut weights = Vec::new();
159        let mut device_versions = Vec::new();
160
161        for device in devices {
162            let version = device.get_opencl_version();
163            let mut supports_out_of_order = false;
164
165            let queue = if version >= OpenCLVersion::V2_0 {
166                if let Ok(host_props) = device.get_queue_on_host_properties() {
167                    supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
168                }
169
170                let properties = CommandQueueProperties::<Version20>::new()
171                    .set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
172                    .get_properties();
173                ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
174            } else {
175                let mut properties = 0;
176                if profiling_enabled {
177                    properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
178                }
179                #[allow(deprecated)]
180                ClCommandQueue::create_command_queue(&context, device, properties)?
181            };
182
183            queues.push(queue);
184            weights.push(Self::measure_device_capacity(device)?);
185            device_versions.push(version);
186        }
187
188        Ok(Self {
189            context,
190            queues,
191            weights,
192            device_versions,
193            profiling_enabled,
194            devices: devices_vec,
195        })
196    }
197
198    pub fn is_profiling_enabled(&self) -> bool {
199        self.profiling_enabled
200    }
201
202    pub fn get_context(&self) -> Arc<ClContext> {
203        self.context.clone()
204    }
205
206    pub fn get_device_versions(&self) -> &[OpenCLVersion] {
207        &self.device_versions
208    }
209
210    pub fn get_devices(&self) -> Result<Vec<ClDevice>, ClError> {
211        Ok(self.devices.iter().map(|d| d.clone()).collect())
212    }
213
214    pub fn get_queues(&self) -> &[ClCommandQueue] {
215        &self.queues
216    }
217
218    //
219    // Engine (Self)
220    //
221
222    /// Creates a task to be executed.
223    /// 
224    /// This is the entry point for the declarative workflow.
225    /// It receives a `ClKernel` (the function that will run on the GPU) and returns
226    /// a `TaskBuilder` to configure the arguments and work size.
227    pub fn create_task<'a>(&'a self, kernel: &'a ClKernel) -> TaskBuilder<'a> {
228        TaskBuilder::new(self, kernel)
229    }
230
231    /// Creates a pipeline to execute multiple kernels sequentially.
232    /// 
233    /// This is ideal for cases where the output of one kernel is the input of another,
234    /// as it manages dependencies (events) automatically.
235    pub fn create_pipeline(&self) -> PipelineBuilder<'_> {
236        PipelineBuilder::new(self)
237    }
238
239    //
240    // Facade Methods (Simplifican la creación de recursos)
241    //
242
243    /// Compiles an OpenCL program from source code (C-like).
244    /// 
245    /// # Example
246    /// ```rust
247    /// let source = "kernel void add(...) { ... }";
248    /// let program = executor.build_program(source.to_string(), None)?;
249    /// ```
250    pub fn build_program(&self, source: String, options: Option<&str>) -> Result<ClProgram<Builded>, ClError> {
251        let unbuilded = ClProgram::<NotBuilded>::from_src(&self.context, source)?;
252        let devices = self.context.get_devices()?;
253        
254        let params = match options {
255            Some(opt) => ProgramParameters::default().custom(opt).get_parameters(),
256            None => ProgramParameters::default().get_parameters(),
257        };
258
259    unbuilded.build(&params, &devices)
260    }
261
262    /// Compiles the program or loads it from binary if available.
263    ///
264    /// Checks if binaries exist in `binary_dest_folder`. If so, loads them.
265    /// Otherwise, compiles from `src_path` and saves binaries to `binary_dest_folder`.
266    pub fn compile_or_binary(
267        &self,
268        src_path: &str,
269        binary_dest_folder: &str,
270        options: Option<&str>,
271    ) -> Result<ClProgram<Builded>, ClError> {
272        use std::fs;
273        use std::path::Path;
274        use std::io::Read;
275        use crate::error::wrapper_error::WrapperError;
276
277        let path = Path::new(src_path);
278        let file_stem = path.file_stem()
279            .and_then(|s| s.to_str())
280            .ok_or(ClError::Wrapper(WrapperError::FailedToConvertStrToCString))?; 
281
282        let devices = self.context.get_devices()?;
283        
284        let mut binaries: Vec<Vec<u8>> = Vec::new();
285        let mut use_binaries = true;
286
287        for (i, device) in devices.iter().enumerate() {
288            let device_name = device.get_name()?.replace(" ", "_");
289            let bin_filename = format!("{}_{}_{}.bin", file_stem, device_name, i);
290            let bin_path = Path::new(binary_dest_folder).join(bin_filename);
291
292            if bin_path.exists() {
293                match fs::read(&bin_path) {
294                    Ok(content) => binaries.push(content),
295                    Err(_) => {
296                        use_binaries = false;
297                        break;
298                    }
299                }
300            } else {
301                use_binaries = false;
302                break;
303            }
304        }
305
306        if use_binaries && binaries.len() == devices.len() {
307             let binary_slices: Vec<&[u8]> = binaries.iter().map(|b| b.as_slice()).collect();
308             
309             match ClProgram::<NotBuilded>::from_binary(&self.context, &devices, &binary_slices) {
310                 Ok(program) => {
311                    let params = match options {
312                        Some(opt) => ProgramParameters::default().custom(opt).get_parameters(),
313                        None => ProgramParameters::default().get_parameters(),
314                    };
315                    
316                    match program.build(&params, &devices) {
317                        Ok(built) => return Ok(built),
318                        Err(_) => {
319                            // Build from binary failed. Fallback to source.
320                        }
321                    }
322                 },
323                 Err(_) => {
324                     // Failed to create from binary. Fallback.
325                 }
326             }
327        }
328
329        // Compile from source
330        let mut src_content = String::new();
331        fs::File::open(src_path)
332            .map_err(|_| ClError::Wrapper(WrapperError::FileIOError))?
333            .read_to_string(&mut src_content)
334            .map_err(|_| ClError::Wrapper(WrapperError::FileIOError))?;
335
336        let built_program = self.build_program(src_content, options)?;
337
338        // Save binaries
339        let _ = built_program.save_binary(binary_dest_folder, file_stem);
340
341        Ok(built_program)
342    }
343
344    /// Creates a Kernel from an already compiled program.
345    /// The `name` must exactly match the name of the `kernel` function in your C code.
346    pub fn create_kernel(&self, program: &ClProgram<Builded>, name: &str) -> Result<ClKernel, ClError> {
347        ClKernel::new(program, name)
348    }
349
350    /// Creates a memory Buffer on the GPU.
351    /// 
352    /// Buffers are "boxes" of data that the GPU can read or write.
353    pub fn create_buffer(&self, flags: &[MemoryFlags], size: usize, host_ptr: *mut c_void) -> Result<ClBuffer, ClError> {
354        ClBuffer::new(&self.context, &flags.to_vec(), size, host_ptr)
355    }
356
357    /// Creates an OpenCL Image (requires OpenCL 1.2+).
358    /// Images are optimized for 2D/3D access and filtering.
359    #[cfg(feature = "CL_VERSION_1_2")]
360    pub fn create_image(
361        &self, 
362        flags: &[MemoryFlags], 
363        format: &ClImageFormats, 
364        desc: &ClImageDesc, 
365        host_ptr: *mut c_void
366    ) -> Result<ClImage, ClError> {
367        ClImage::new(&self.context, &flags.to_vec(), format, desc, host_ptr)
368    }
369
370    /// Creates an SVM Buffer (Shared Virtual Memory). (Requires OpenCL 2.0+).
371    /// Allows sharing pointers directly between CPU and GPU without manual copies.
372    #[cfg(feature = "CL_VERSION_2_0")]
373    pub fn create_svm_buffer<T>(&self, flags: &[MemoryFlags], len: usize) -> Result<ClSvmBuffer<T>, ClError> {
374        ClSvmBuffer::<T>::new(&self.context, &flags.to_vec(), len, 0)
375    }
376
377    /// Reads data from a buffer to host memory.
378    /// Uses the most powerful GPU available to perform the copy.
379    pub async fn read_buffer<T: Sized>(
380        &self,
381        buffer: &ClBuffer,
382        host_memory: &mut [T],
383    ) -> Result<ClEvent, ClError> {
384        let queue = self.get_optimal_queue();
385        queue.enqueue_read_buffer(buffer, None, host_memory, None).await
386    }
387
388    /// Writes data from host memory to a buffer.
389    /// Uses the most powerful GPU available to perform the copy.
390    pub async fn write_buffer<T: Sized>(
391        &self,
392        buffer: &ClBuffer,
393        host_memory: &mut [T],
394    ) -> Result<ClEvent, ClError> {
395         let queue = self.get_optimal_queue();
396         let size = host_memory.len() * std::mem::size_of::<T>();
397         queue.write_buffer(buffer, host_memory.as_mut_ptr() as *mut c_void, 0, size, None).await
398    }
399    
400    /// Reads data from an image to host memory.
401    #[cfg(feature = "CL_VERSION_1_2")]
402    pub async fn read_image<T: Sized>(
403        &self,
404        image: &ClImage,
405        host_memory: &mut [T],
406        origin: [usize; 3],
407        region: [usize; 3],
408    ) -> Result<ClEvent, ClError> {
409        let queue = self.get_optimal_queue();
410        queue.read_image_raw(
411            image, 
412            origin, 
413            region, 
414            0, 
415            0, 
416            host_memory.as_mut_ptr() as *mut c_void, 
417            None
418        ).await
419    }
420
421    /// Writes data from host memory to an image.
422    #[cfg(feature = "CL_VERSION_1_2")]
423    pub async fn write_image<T: Sized>(
424        &self,
425         image: &ClImage,
426         host_memory: &mut [T],
427         origin: [usize; 3],
428         region: [usize; 3],
429    ) -> Result<ClEvent, ClError> {
430        let queue = self.get_optimal_queue();
431        queue.write_image_raw(
432            image,
433            origin,
434            region,
435            0,
436            0,
437            host_memory.as_mut_ptr() as *mut c_void,
438            None
439        ).await
440    }
441
442    //
443    //
444    // Utils
445    //
446    //
447    fn get_optimal_queue(&self) -> &ClCommandQueue {
448        let mut max_weight = 0;
449        let mut idx = 0;
450        for (i, &weight) in self.weights.iter().enumerate() {
451            if weight > max_weight {
452                max_weight = weight;
453                idx = i;
454            }
455        }
456        &self.queues[idx]
457    }
458
459    fn measure_platform_capacity(platform: &ClPlatform) -> Result<u64, ClError> {
460        let mut score: u64 = 0;
461
462        let devices = platform.get_all_devices()?;
463        for device in &devices {
464            score += Self::measure_device_capacity(device)?;
465        }
466        Ok(score)
467    }
468
469    fn measure_device_capacity(device: &ClDevice) -> Result<u64, ClError> {
470        let compute_units = device.get_max_compute_units()?;
471        let clock_frequency = device.get_max_clock_frequency()?;
472        let memory = device.get_global_mem_size()? / (1024 * 1024);
473        let device_type = device.get_device_type()?;
474
475        let mut score = ((compute_units as u64 * clock_frequency as u64) / 100) + (memory / 10);
476
477        // Core improvement: Prioritize GPUs heavily.
478        // A single weak GPU is usually better for OpenCL tasks than a powerful CPU
479        // due to architectural differences and driver optimizations.
480        if (device_type & cl3::device::CL_DEVICE_TYPE_GPU) != 0 {
481            score += 1_000_000;
482        } else if (device_type & cl3::device::CL_DEVICE_TYPE_ACCELERATOR) != 0 {
483            score += 500_000;
484        }
485
486        Ok(score)
487    }
488}
489
490unsafe impl Sync for AsyncExecutor {}
491unsafe impl Send for AsyncExecutor {}