extern crate chrono;
extern crate futures;
extern crate futures_cpupool;
extern crate ocl;
#[macro_use]
extern crate colorify;
use futures::{stream, Future, Stream};
use futures_cpupool::CpuPool;
use ocl::flags::{CommandQueueProperties, MapFlags, MemFlags};
use ocl::prm::Float4;
use ocl::{Buffer, Context, Device, Event, Kernel, Platform, Program, Queue, Result as OclResult};
use std::cell::Cell;
use std::collections::VecDeque;
static KERN_SRC: &'static str = r#"
__kernel void add(
__global float4* in,
float4 values,
__global float4* out)
{
uint idx = get_global_id(0);
out[idx] = in[idx] + values;
}
"#;
fn fmt_duration(duration: chrono::Duration) -> String {
let el_sec = duration.num_seconds();
let el_ms = duration.num_milliseconds() - (el_sec * 1000);
format!("{}.{} seconds", el_sec, el_ms)
}
pub fn async_process() -> OclResult<()> {
let start_time = chrono::Local::now();
let platform = Platform::default();
printlnc!(blue: "Platform: {}", platform.name()?);
let device = Device::first(platform)?;
printlnc!(teal: "Device: {} {}", device.vendor()?, device.name()?);
let context = Context::builder()
.platform(platform)
.devices(device)
.build()?;
let queue_flags = Some(CommandQueueProperties::new().out_of_order());
let write_queue = Queue::new(&context, device, queue_flags)
.or_else(|_| Queue::new(&context, device, None))?;
let read_queue = Queue::new(&context, device, queue_flags)
.or_else(|_| Queue::new(&context, device, None))?;
let kern_queue = Queue::new(&context, device, queue_flags)
.or_else(|_| Queue::new(&context, device, None))?;
let thread_pool = CpuPool::new_num_cpus();
let task_count = 12;
let redundancy_count = 2000;
let mut offloads = VecDeque::with_capacity(task_count);
println!("Creating and enqueuing tasks...");
for task_id in 0..task_count {
let work_size = 1 << 14;
let write_buf_flags = MemFlags::new().read_only().host_write_only();
let read_buf_flags = MemFlags::new().write_only().host_read_only();
let write_buf: Buffer<Float4> = Buffer::builder()
.queue(write_queue.clone())
.flags(write_buf_flags)
.len(work_size)
.build()?;
let read_buf: Buffer<Float4> = Buffer::builder()
.queue(read_queue.clone())
.flags(read_buf_flags)
.len(work_size)
.build()?;
let program = Program::builder()
.devices(device)
.src(KERN_SRC)
.build(&context)?;
let kern = Kernel::builder()
.name("add")
.program(&program)
.queue(kern_queue.clone())
.global_work_size(work_size)
.arg(&write_buf)
.arg(Float4::new(100., 100., 100., 100.))
.arg(&read_buf)
.build()?;
let mut fill_event = Event::empty();
write_buf
.cmd()
.fill(Float4::new(-999., -999., -999., -999.), None)
.enew(&mut fill_event)
.enq()?;
let mut future_write_data = unsafe {
write_buf
.cmd()
.map()
.flags(MapFlags::new().write_invalidate_region())
.enq_async()?
};
future_write_data.set_unmap_wait_events(&fill_event);
let write_unmap_event = future_write_data.create_unmap_event()?.clone();
let write = future_write_data.and_then(move |mut data| {
for _ in 0..redundancy_count {
for val in data.iter_mut() {
*val = Float4::new(50., 50., 50., 50.);
}
}
println!("Mapped write complete (task: {}). ", task_id);
Ok(task_id)
});
let spawned_write = thread_pool.spawn(write);
let mut kern_event = Event::empty();
unsafe {
kern.cmd()
.enew(&mut kern_event)
.ewait(&write_unmap_event)
.enq()?;
}
let future_read_data = unsafe {
read_buf
.cmd()
.map()
.flags(MapFlags::new().read())
.ewait(&kern_event)
.enq_async()?
};
let read = future_read_data.and_then(move |data| {
let mut val_count = 0usize;
for _ in 0..redundancy_count {
for val in data.iter() {
let correct_val = Float4::new(150., 150., 150., 150.);
if *val != correct_val {
return Err(format!(
"Result value mismatch: {:?} != {:?}",
val, correct_val
)
.into());
}
val_count += 1;
}
}
println!("Mapped read and verify complete (task: {}). ", task_id);
Ok(val_count)
});
let spawned_read = thread_pool.spawn(read);
let offload = spawned_write.join(spawned_read);
offloads.push_back(offload);
}
println!("Running tasks...");
let create_duration = chrono::Local::now() - start_time;
let correct_val_count = Cell::new(0usize);
stream::futures_unordered(offloads)
.for_each(|(task_id, val_count)| {
correct_val_count.set(correct_val_count.get() + val_count);
println!("Task: {} has completed.", task_id);
Ok(())
})
.wait()?;
let run_duration = chrono::Local::now() - start_time - create_duration;
let total_duration = chrono::Local::now() - start_time;
printlnc!(yellow_bold: "All {} (float4) result values are correct! \n\
Durations => | Create/Enqueue: {} | Run: {} | Total: {} |",
correct_val_count.get() / redundancy_count, fmt_duration(create_duration),
fmt_duration(run_duration), fmt_duration(total_duration));
Ok(())
}
pub fn main() {
match async_process() {
Ok(_) => (),
Err(err) => println!("{}", err),
}
}