use crate::{
async_executor::{kernel_arg::KernelArg, AsyncExecutor, task_builder::{TaskReport, OutputRead}},
cl_types::{
cl_event::ClEvent,
cl_kernel::ClKernel,
cl_buffer::ClBuffer,
cl_image::ClImage,
cl_svm_buffer::ClSvmBuffer,
},
error::ClError,
};
use std::os::raw::c_void;
use futures;
#[cfg(feature = "CL_VERSION_1_1")]
pub struct PipelineStage<'a> {
pub(crate) kernel: ClKernel,
pub(crate) kernel_args: Vec<KernelArg<'a>>,
pub(crate) global_work_dims: [usize; 3],
pub(crate) global_work_offset: [usize; 3],
pub(crate) local_work_dims: Option<[usize; 3]>,
}
#[cfg(feature = "CL_VERSION_1_1")]
pub struct PipelineReport {
pub stage_reports: Vec<TaskReport>,
}
impl PipelineReport {
pub fn total_kernel_duration_ns(&self) -> u64 {
self.stage_reports.iter().map(|r| r.total_kernel_duration_ns()).sum()
}
}
#[cfg(feature = "CL_VERSION_1_1")]
pub struct PipelineBuilder<'a> {
async_executor: &'a AsyncExecutor,
stages: Vec<PipelineStage<'a>>,
final_reads: Vec<OutputRead<'a>>,
profiling_enabled: bool,
}
impl<'a> PipelineBuilder<'a> {
pub fn new(async_executor: &'a AsyncExecutor) -> Self {
Self {
async_executor,
stages: Vec::new(),
final_reads: Vec::new(),
profiling_enabled: async_executor.is_profiling_enabled(),
}
}
pub fn add_stage(self, kernel: ClKernel, x: usize, y: usize, z: usize) -> StageBuilder<'a> {
StageBuilder {
pipeline_builder: self,
kernel,
kernel_args: Vec::new(),
global_work_dims: [x, y, z],
global_work_offset: [0, 0, 0],
local_work_dims: None,
}
}
pub fn read_buffer<T>(mut self, buffer: &'a ClBuffer, host_memory: &mut [T]) -> Self {
self.final_reads.push(OutputRead::Buffer {
buffer,
host_ptr: host_memory.as_mut_ptr() as *mut c_void,
size: host_memory.len() * std::mem::size_of::<T>(),
});
self
}
pub async fn run(mut self) -> Result<PipelineReport, ClError> {
let mut report = PipelineReport { stage_reports: Vec::new() };
let mut last_events: Option<Vec<ClEvent>> = None;
let stages = std::mem::take(&mut self.stages);
for stage in stages {
let res = self.run_stage(stage, last_events).await?;
last_events = Some(res.kernel_execution_events.clone());
report.stage_reports.push(res);
}
if !self.final_reads.is_empty() {
let queue = self.async_executor.get_optimal_queue();
for read in self.final_reads {
match read {
OutputRead::Buffer { buffer, host_ptr, size } => {
queue.enqueue_read_buffer_raw(buffer, None, host_ptr, size, last_events.clone()).await?;
}
#[cfg(feature = "CL_VERSION_1_2")]
OutputRead::Image { image, host_ptr, origin, region } => {
queue.read_image_raw(image, origin, region, 0, 0, host_ptr, last_events.clone()).await?;
}
}
}
}
Ok(report)
}
async fn run_stage(&self, stage: PipelineStage<'a>, wait_list: Option<Vec<ClEvent>>) -> Result<TaskReport, ClError> {
let mut report = TaskReport::new();
let num_queues = self.async_executor.queues.len();
let mut g_offset = stage.global_work_offset[0];
let total_work = stage.global_work_dims[0];
let total_weight: u64 = self.async_executor.weights.iter().sum();
let mut futures = Vec::new();
for i in 0..num_queues {
let weight = self.async_executor.weights[i];
let chunk_size = if i == num_queues - 1 {
stage.global_work_offset[0] + total_work - g_offset
} else {
((total_work as u128 * weight as u128) / total_weight as u128) as usize
};
if chunk_size == 0 && i != num_queues - 1 { continue; }
let work_dim = if stage.global_work_dims[2] > 1 || stage.global_work_offset[2] > 0 { 3 }
else if stage.global_work_dims[1] > 1 || stage.global_work_offset[1] > 0 { 2 }
else { 1 };
let g_offset_trimmed = vec![g_offset, stage.global_work_offset[1], stage.global_work_offset[2]][..work_dim].to_vec();
let g_dims_trimmed = vec![chunk_size, stage.global_work_dims[1], stage.global_work_dims[2]][..work_dim].to_vec();
let kernel = stage.kernel.clone();
for arg in &stage.kernel_args {
match arg {
KernelArg::Scalar { arg_index, arg } => {
unsafe { kernel.set_args(*arg_index, arg.len(), arg.as_ptr() as *const _)?; }
}
KernelArg::Buffer { arg_index, arg } => {
let handle = arg.as_ptr();
unsafe { kernel.set_args(*arg_index, 8, &handle as *const _ as *const _)?; }
}
#[cfg(feature = "CL_VERSION_1_2")]
KernelArg::Image { arg_index, arg } => {
let handle = arg.as_ptr();
unsafe { kernel.set_args(*arg_index, 8, &handle as *const _ as *const _)?; }
}
#[cfg(feature = "CL_VERSION_2_0")]
KernelArg::Svm { arg_index, arg, len: _ } => {
unsafe { kernel.set_svm_arg(*arg_index, 8, *arg)?; }
}
#[cfg(feature = "CL_VERSION_2_0")]
KernelArg::Pipe { arg_index, arg } => {
let handle = arg.as_ptr();
unsafe { kernel.set_args(*arg_index, 8, &handle as *const _ as *const _)?; }
}
}
}
let queue = self.async_executor.queues[i].clone();
let wl = wait_list.clone();
let l_dims_vec = if let Some(ld) = stage.local_work_dims {
ld[..work_dim].to_vec()
} else {
Vec::new()
};
futures.push(async move {
queue.enqueue_nd_range_kernel(
&kernel,
work_dim as u32,
g_offset_trimmed,
g_dims_trimmed,
l_dims_vec,
None,
wl
).await
});
g_offset += chunk_size;
}
let results = futures::future::join_all(futures).await;
for res in results {
let event = res?;
if self.profiling_enabled {
report.kernel_execution_events.push(event);
}
}
Ok(report)
}
}
pub struct StageBuilder<'a> {
pipeline_builder: PipelineBuilder<'a>,
kernel: ClKernel,
kernel_args: Vec<KernelArg<'a>>,
global_work_dims: [usize; 3],
global_work_offset: [usize; 3],
local_work_dims: Option<[usize; 3]>,
}
impl<'a> StageBuilder<'a> {
pub fn arg_buffer(mut self, index: u32, buffer: &'a ClBuffer) -> Self {
self.kernel_args.push(KernelArg::Buffer { arg_index: index, arg: buffer });
self
}
pub fn arg_scalar<T>(mut self, index: u32, scalar: T) -> Self {
let arg = unsafe {
std::slice::from_raw_parts(&scalar as *const T as *const u8, std::mem::size_of::<T>()).to_vec()
};
self.kernel_args.push(KernelArg::Scalar { arg_index: index, arg });
self
}
#[cfg(feature = "CL_VERSION_1_2")]
pub fn arg_image(mut self, index: u32, image: &'a ClImage) -> Self {
self.kernel_args.push(KernelArg::Image { arg_index: index, arg: image });
self
}
#[cfg(feature = "CL_VERSION_2_0")]
pub fn arg_svm<T>(mut self, index: u32, buffer: &'a ClSvmBuffer<T>) -> Self {
self.kernel_args.push(KernelArg::Svm { arg_index: index, arg: buffer.as_ptr(), len: buffer.len });
self
}
pub fn local_work_dims(mut self, x: usize, y: usize, z: usize) -> Self {
self.local_work_dims = Some([x, y, z]);
self
}
pub fn global_work_offset(mut self, x: usize, y: usize, z: usize) -> Self {
self.global_work_offset = [x, y, z];
self
}
pub fn finish(self) -> PipelineBuilder<'a> {
let mut pb = self.pipeline_builder;
let stage = PipelineStage {
kernel: self.kernel,
kernel_args: self.kernel_args,
global_work_dims: self.global_work_dims,
global_work_offset: self.global_work_offset,
local_work_dims: self.local_work_dims,
};
pb.stages.push(stage);
pb
}
}