ocl 0.19.7

OpenCL bindings and interfaces for Rust.
Documentation
//! Use a thread pool to offload host pre- and post-processing on multiple
//! asynchronous tasks.
//!
//!
//!

extern crate chrono;
extern crate futures;
extern crate futures_cpupool;
extern crate ocl;
#[macro_use]
extern crate colorify;

use futures::{stream, Future, Stream};
use futures_cpupool::CpuPool;
use ocl::flags::{CommandQueueProperties, MapFlags, MemFlags};
use ocl::prm::Float4;
use ocl::{Buffer, Context, Device, Event, Kernel, Platform, Program, Queue, Result as OclResult};
use std::cell::Cell;
use std::collections::VecDeque;

static KERN_SRC: &'static str = r#"
    __kernel void add(
            __global float4* in,
            float4 values,
            __global float4* out)
    {
        uint idx = get_global_id(0);
        out[idx] = in[idx] + values;
    }
"#;

fn fmt_duration(duration: chrono::Duration) -> String {
    let el_sec = duration.num_seconds();
    let el_ms = duration.num_milliseconds() - (el_sec * 1000);
    format!("{}.{} seconds", el_sec, el_ms)
}

pub fn async_process() -> OclResult<()> {
    let start_time = chrono::Local::now();

    let platform = Platform::default();
    printlnc!(blue: "Platform: {}", platform.name()?);

    let device = Device::first(platform)?;
    printlnc!(teal: "Device: {} {}", device.vendor()?, device.name()?);

    let context = Context::builder()
        .platform(platform)
        .devices(device)
        .build()?;

    let queue_flags = Some(CommandQueueProperties::new().out_of_order());
    let write_queue = Queue::new(&context, device, queue_flags)
        .or_else(|_| Queue::new(&context, device, None))?;
    let read_queue = Queue::new(&context, device, queue_flags)
        .or_else(|_| Queue::new(&context, device, None))?;
    let kern_queue = Queue::new(&context, device, queue_flags)
        .or_else(|_| Queue::new(&context, device, None))?;

    let thread_pool = CpuPool::new_num_cpus();
    let task_count = 12;
    let redundancy_count = 2000;
    let mut offloads = VecDeque::with_capacity(task_count);

    println!("Creating and enqueuing tasks...");

    for task_id in 0..task_count {
        let work_size = 1 << 14;

        let write_buf_flags = MemFlags::new().read_only().host_write_only();
        let read_buf_flags = MemFlags::new().write_only().host_read_only();

        // Create write and read buffers:
        let write_buf: Buffer<Float4> = Buffer::builder()
            .queue(write_queue.clone())
            .flags(write_buf_flags)
            .len(work_size)
            .build()?;

        let read_buf: Buffer<Float4> = Buffer::builder()
            .queue(read_queue.clone())
            .flags(read_buf_flags)
            .len(work_size)
            .build()?;

        // Create program and kernel:
        let program = Program::builder()
            .devices(device)
            .src(KERN_SRC)
            .build(&context)?;

        let kern = Kernel::builder()
            .name("add")
            .program(&program)
            .queue(kern_queue.clone())
            .global_work_size(work_size)
            .arg(&write_buf)
            .arg(Float4::new(100., 100., 100., 100.))
            .arg(&read_buf)
            .build()?;

        // (0) INIT: Fill buffer with -999's just to ensure the upcoming
        // write misses nothing:
        let mut fill_event = Event::empty();
        write_buf
            .cmd()
            .fill(Float4::new(-999., -999., -999., -999.), None)
            .enew(&mut fill_event)
            .enq()?;

        // (1) WRITE: Map the buffer and write 50's to the entire buffer, then
        // unmap to 'flush' data to the device:
        let mut future_write_data = unsafe {
            write_buf
                .cmd()
                .map()
                .flags(MapFlags::new().write_invalidate_region())
                // .ewait(&fill_event)
                .enq_async()?
        };

        // Since this is an invalidating write we'll use the wait list for the
        // unmap rather than the map command:
        future_write_data.set_unmap_wait_events(&fill_event);
        let write_unmap_event = future_write_data.create_unmap_event()?.clone();

        let write = future_write_data.and_then(move |mut data| {
            for _ in 0..redundancy_count {
                for val in data.iter_mut() {
                    *val = Float4::new(50., 50., 50., 50.);
                }
            }

            println!("Mapped write complete (task: {}). ", task_id);
            Ok(task_id)
        });

        let spawned_write = thread_pool.spawn(write);

        // (2) KERNEL: Run kernel: Add 100 to everything (total should now be 150):
        let mut kern_event = Event::empty();

        unsafe {
            kern.cmd()
                .enew(&mut kern_event)
                .ewait(&write_unmap_event)
                .enq()?;
        }

        // (3) READ: Read results and verify that the write and kernel have
        // both completed successfully:
        let future_read_data = unsafe {
            read_buf
                .cmd()
                .map()
                .flags(MapFlags::new().read())
                .ewait(&kern_event)
                .enq_async()?
        };

        let read = future_read_data.and_then(move |data| {
            let mut val_count = 0usize;

            for _ in 0..redundancy_count {
                for val in data.iter() {
                    let correct_val = Float4::new(150., 150., 150., 150.);
                    if *val != correct_val {
                        return Err(format!(
                            "Result value mismatch: {:?} != {:?}",
                            val, correct_val
                        )
                        .into());
                    }
                    val_count += 1;
                }
            }

            println!("Mapped read and verify complete (task: {}). ", task_id);

            Ok(val_count)
        });

        let spawned_read = thread_pool.spawn(read);
        // Presumably this could be either `join` or `and_then`:
        let offload = spawned_write.join(spawned_read);

        offloads.push_back(offload);
    }

    println!("Running tasks...");
    let create_duration = chrono::Local::now() - start_time;
    let correct_val_count = Cell::new(0usize);

    // Finish things up (basically a thread join):
    stream::futures_unordered(offloads)
        .for_each(|(task_id, val_count)| {
            correct_val_count.set(correct_val_count.get() + val_count);
            println!("Task: {} has completed.", task_id);
            Ok(())
        })
        .wait()?;

    let run_duration = chrono::Local::now() - start_time - create_duration;
    let total_duration = chrono::Local::now() - start_time;

    printlnc!(yellow_bold: "All {} (float4) result values are correct! \n\
        Durations => | Create/Enqueue: {} | Run: {} | Total: {} |",
        correct_val_count.get() / redundancy_count, fmt_duration(create_duration),
        fmt_duration(run_duration), fmt_duration(total_duration));
    Ok(())
}

pub fn main() {
    match async_process() {
        Ok(_) => (),
        Err(err) => println!("{}", err),
    }
}