mod task_builder;
mod kernel_arg;
use std::os::raw::c_void;
use std::sync::Arc;
use crate::{
async_executor::task_builder::TaskBuilder,
cl_types::{
cl_buffer::ClBuffer,
cl_command_queue::{ClCommandQueue, command_queue_parameters::{CommandQueueProperties, Version20}},
cl_context::ClContext,
cl_device::ClDevice,
cl_platform::ClPlatform,
cl_kernel::ClKernel,
cl_device::opencl_version::OpenCLVersion,
cl_program::{ClProgram, Builded, NotBuilded, program_parameters::ProgramParameters},
cl_image::{ClImage, image_desc::ClImageDesc, image_formats::ClImageFormats},
cl_svm_buffer::ClSvmBuffer,
memory_flags::MemoryFlags,
},
error::ClError
};
#[cfg(feature = "CL_VERSION_1_1")]
pub struct AsyncExecutor {
context: Arc<ClContext>,
queues: Vec<ClCommandQueue>,
weights: Vec<u64>,
profiling_enabled: bool,
device_versions: Vec<OpenCLVersion>,
}
#[cfg(feature = "CL_VERSION_1_1")]
impl AsyncExecutor {
pub fn new_best_platform() -> Result<Self, ClError> {
Self::new_best_platform_with_options(false)
}
pub fn new_best_platform_with_options(profiling_enabled: bool) -> Result<Self, ClError> {
let platforms = ClPlatform::get_all()?;
let scores: Vec<u64> = platforms
.iter()
.map(|f| Self::measure_platform_capacity(f))
.collect::<Result<Vec<u64>, ClError>>()?;
let best_score_index = {
let mut idx: usize = 0;
let mut max: u64 = 0;
for (i, v) in scores.iter().enumerate() {
if *v > max {
idx = i;
max = *v
}
}
idx
};
let devices = match platforms.get(best_score_index) {
Some(plat) => plat.get_all_devices()?,
None => {
return Err(ClError::Wrapper(
crate::error::wrapper_error::WrapperError::PlatformsNotFound,
));
}
};
let context = Arc::new(ClContext::new(&devices)?);
let mut queues = Vec::new();
let mut weights = Vec::new();
let mut device_versions = Vec::new();
for device in &devices {
let version = device.get_opencl_version();
let mut supports_out_of_order = false;
let queue = if version >= OpenCLVersion::V2_0 {
if let Ok(host_props) = device.get_queue_on_host_properties() {
supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
}
let properties = CommandQueueProperties::<Version20>::new()
.set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
.get_properties();
ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
} else {
let mut properties = 0;
if profiling_enabled {
properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
}
#[allow(deprecated)]
ClCommandQueue::create_command_queue(&context, device, properties)?
};
queues.push(queue);
weights.push(Self::measure_device_capacity(device)?);
device_versions.push(version);
}
let executor = Self {
context,
queues,
weights,
device_versions,
profiling_enabled,
};
Ok(executor)
}
pub fn new_all_platforms() -> Result<Self, ClError> {
let platforms = ClPlatform::get_all()?;
Self::new_from_platforms(&platforms)
}
pub fn new_from_platforms(platforms: &[ClPlatform]) -> Result<Self, ClError> {
let mut all_devices = Vec::new();
for platform in platforms {
let devices = platform.get_all_devices()?;
all_devices.extend(devices);
}
Self::new_from_devices(&all_devices)
}
pub fn new_from_devices(devices: &[ClDevice]) -> Result<Self, ClError> {
Self::new_from_devices_with_options(devices, false)
}
pub fn new_from_devices_with_options(devices: &[ClDevice], profiling_enabled: bool) -> Result<Self, ClError> {
let devices_vec = devices.to_vec();
let context = Arc::new(ClContext::new(&devices_vec)?);
let mut queues = Vec::new();
let mut weights = Vec::new();
let mut device_versions = Vec::new();
for device in devices {
let version = device.get_opencl_version();
let mut supports_out_of_order = false;
let queue = if version >= OpenCLVersion::V2_0 {
if let Ok(host_props) = device.get_queue_on_host_properties() {
supports_out_of_order = (host_props as u64 & cl3::command_queue::CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE) != 0;
}
let properties = CommandQueueProperties::<Version20>::new()
.set_cl_queue_properties(supports_out_of_order, profiling_enabled, false, false)
.get_properties();
ClCommandQueue::create_command_queue_with_properties(&context, device, &properties)?
} else {
let mut properties = 0;
if profiling_enabled {
properties |= cl3::command_queue::CL_QUEUE_PROFILING_ENABLE;
}
#[allow(deprecated)]
ClCommandQueue::create_command_queue(&context, device, properties)?
};
queues.push(queue);
weights.push(Self::measure_device_capacity(device)?);
device_versions.push(version);
}
Ok(Self {
context,
queues,
weights,
device_versions,
profiling_enabled,
})
}
pub fn is_profiling_enabled(&self) -> bool {
self.profiling_enabled
}
pub fn get_context(&self) -> Arc<ClContext> {
self.context.clone()
}
pub fn get_device_versions(&self) -> &[OpenCLVersion] {
&self.device_versions
}
pub fn get_devices(&self) -> Result<Vec<ClDevice>, ClError> {
self.context.get_devices()
}
pub fn get_queues(&self) -> &[ClCommandQueue] {
&self.queues
}
pub fn create_task(&self, kernel: ClKernel) -> TaskBuilder<'_> {
TaskBuilder::new(self, kernel)
}
pub fn build_program(&self, source: String, options: Option<&str>) -> Result<ClProgram<Builded>, ClError> {
let unbuilded = ClProgram::<NotBuilded>::from_src(&self.context, source)?;
let devices = self.context.get_devices()?;
let params = match options {
Some(opt) => ProgramParameters::default().custom(opt).get_parameters(),
None => ProgramParameters::default().get_parameters(),
};
unbuilded.build(¶ms, &devices)
}
pub fn create_kernel(&self, program: &ClProgram<Builded>, name: &str) -> Result<ClKernel, ClError> {
ClKernel::new(program, name)
}
pub fn create_buffer(&self, flags: &[MemoryFlags], size: usize, host_ptr: *mut c_void) -> Result<ClBuffer, ClError> {
ClBuffer::new(&self.context, &flags.to_vec(), size, host_ptr)
}
#[cfg(feature = "CL_VERSION_1_2")]
pub fn create_image(
&self,
flags: &[MemoryFlags],
format: &ClImageFormats,
desc: &ClImageDesc,
host_ptr: *mut c_void
) -> Result<ClImage, ClError> {
ClImage::new(&self.context, &flags.to_vec(), format, desc, host_ptr)
}
#[cfg(feature = "CL_VERSION_2_0")]
pub fn create_svm_buffer<T>(&self, flags: &[MemoryFlags], len: usize) -> Result<ClSvmBuffer<T>, ClError> {
ClSvmBuffer::<T>::new(&self.context, &flags.to_vec(), len, 0)
}
fn measure_platform_capacity(platform: &ClPlatform) -> Result<u64, ClError> {
let mut score: u64 = 0;
let devices = platform.get_all_devices()?;
for device in &devices {
score += Self::measure_device_capacity(device)?;
}
Ok(score)
}
fn measure_device_capacity(device: &ClDevice) -> Result<u64, ClError> {
let compute_units = device.get_max_compute_units()?;
let clock_frequency = device.get_max_clock_frequency()?;
let memory = device.get_global_mem_size()? / (1024 * 1024);
Ok(((compute_units as u64 * clock_frequency as u64) / 100) + (memory / 10))
}
}