use libc;
use std::cmp;
use std::iter::Iterator;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize};
use std::sync::{Arc, Mutex, RwLock};
pub type ISPCTaskFn = extern "C" fn(
data: *mut libc::c_void,
thread_idx: libc::c_int,
thread_cnt: libc::c_int,
task_idx: libc::c_int,
task_cnt: libc::c_int,
task_idx0: libc::c_int,
task_idx1: libc::c_int,
task_idx2: libc::c_int,
task_cnt0: libc::c_int,
task_cnt1: libc::c_int,
task_cnt2: libc::c_int,
);
#[derive(Debug)]
pub struct Context {
tasks: RwLock<Vec<Arc<Group>>>,
mem: Mutex<Vec<(AtomicPtr<libc::c_void>, std::alloc::Layout)>>,
pub id: usize,
}
impl Context {
pub fn new(id: usize) -> Context {
Context {
tasks: RwLock::new(Vec::new()),
mem: Mutex::new(Vec::new()),
id,
}
}
pub fn launch(&self, total: (i32, i32, i32), data: *mut libc::c_void, fcn: ISPCTaskFn) {
self.tasks
.write()
.unwrap()
.push(Arc::new(Group::new(total, AtomicPtr::new(data), fcn)));
}
pub fn current_tasks_done(&self) -> bool {
self.tasks.read().unwrap().iter().all(|t| t.is_finished())
}
pub unsafe fn alloc(&self, size: usize, align: usize) -> *mut libc::c_void {
let layout = std::alloc::Layout::from_size_align(size, align)
.expect("std::alloc::Layout is invalid. Make sure the align is a power of 2");
let ptr = std::alloc::alloc(layout) as *mut libc::c_void;
let mut mem = self.mem.lock().unwrap();
mem.push((AtomicPtr::new(ptr), layout));
ptr
}
pub fn iter(&self) -> ContextIter<'_> {
ContextIter { context: self }
}
fn get_active_group(&self) -> Option<Arc<Group>> {
let tasks = self.tasks.read().unwrap();
for group in tasks.iter() {
if group.has_tasks() {
return Some(Arc::clone(group));
}
}
None
}
}
impl Drop for Context {
fn drop(&mut self) {
let mut mem = self.mem.lock().unwrap();
for tup in mem.drain(0..) {
let ptr = tup.0;
let layout = tup.1;
let m = ptr.load(atomic::Ordering::SeqCst);
unsafe { std::alloc::dealloc(m as *mut u8, layout) };
}
}
}
pub struct ContextIter<'a> {
context: &'a Context,
}
impl Iterator for ContextIter<'_> {
type Item = Arc<Group>;
fn next(&mut self) -> Option<Arc<Group>> {
self.context.get_active_group()
}
}
#[derive(Debug)]
pub struct Group {
start: AtomicUsize,
end: usize,
pub total: (i32, i32, i32),
pub fcn: ISPCTaskFn,
pub data: AtomicPtr<libc::c_void>,
chunks_launched: AtomicUsize,
chunks_finished: AtomicUsize,
}
impl Group {
pub fn new(total: (i32, i32, i32), data: AtomicPtr<libc::c_void>, fcn: ISPCTaskFn) -> Group {
Group {
start: AtomicUsize::new(0),
end: (total.0 * total.1 * total.2) as usize,
total,
data,
fcn,
chunks_launched: AtomicUsize::new(0),
chunks_finished: AtomicUsize::new(0),
}
}
pub fn chunks(&self, chunk_size: usize) -> GroupChunks<'_> {
GroupChunks {
group: self,
chunk_size,
}
}
pub fn is_finished(&self) -> bool {
let finished = self.chunks_finished.load(atomic::Ordering::SeqCst);
let launched = self.chunks_launched.load(atomic::Ordering::SeqCst);
let start = self.start.load(atomic::Ordering::SeqCst);
assert!(finished <= launched);
finished == launched && start >= self.end
}
fn has_tasks(&self) -> bool {
let start = self.start.load(atomic::Ordering::SeqCst);
start < self.end
}
fn get_chunk(&self, desired_tasks: usize) -> Option<Chunk<'_>> {
let start = self
.start
.fetch_add(desired_tasks, atomic::Ordering::SeqCst);
if start < self.end {
let c = Some(Chunk::new(
self,
start,
cmp::min(start + desired_tasks, self.end),
));
self.chunks_launched.fetch_add(1, atomic::Ordering::SeqCst);
c
} else {
None
}
}
}
pub struct GroupChunks<'a> {
group: &'a Group,
chunk_size: usize,
}
impl<'a> Iterator for GroupChunks<'a> {
type Item = Chunk<'a>;
fn next(&mut self) -> Option<Chunk<'a>> {
self.group.get_chunk(self.chunk_size)
}
}
#[derive(Debug)]
pub struct Chunk<'a> {
start: i32,
end: i32,
total: (i32, i32, i32),
fcn: ISPCTaskFn,
data: AtomicPtr<libc::c_void>,
group: &'a Group,
}
impl Chunk<'_> {
pub fn new(group: &Group, start: usize, end: usize) -> Chunk<'_> {
let d = AtomicPtr::new(group.data.load(atomic::Ordering::SeqCst));
Chunk {
start: start as i32,
end: end as i32,
total: group.total,
fcn: group.fcn,
data: d,
group,
}
}
pub fn execute(&self, thread_id: i32, total_threads: i32) {
let total_tasks = self.total.0 * self.total.1 * self.total.2;
let data = self.data.load(atomic::Ordering::SeqCst);
for t in self.start..self.end {
let id = self.task_indices(t);
(self.fcn)(
data,
thread_id as libc::c_int,
total_threads as libc::c_int,
t as libc::c_int,
total_tasks as libc::c_int,
id.0 as libc::c_int,
id.1 as libc::c_int,
id.2 as libc::c_int,
self.total.0 as libc::c_int,
self.total.1 as libc::c_int,
self.total.2 as libc::c_int,
);
}
self.group
.chunks_finished
.fetch_add(1, atomic::Ordering::SeqCst);
}
fn task_indices(&self, id: i32) -> (i32, i32, i32) {
(
id % self.total.0,
(id / self.total.0) % self.total.1,
id / (self.total.0 * self.total.1),
)
}
}