ispc_rt/
task.rs

1//! Defines structs for operating on ISPC task groups and getting chunks
2//! of a task to be scheduled on to threads
3
4use libc;
5
6use std::cmp;
7use std::iter::Iterator;
8use std::sync::atomic::{self, AtomicPtr, AtomicUsize};
9use std::sync::{Arc, Mutex, RwLock};
10
11/// A pointer to an ISPC task function.
12///
13/// The ISPC task function pointer is:
14/// ```c
15/// void (*TaskFuncPtr)(void *data, int threadIndex, int threadCount,
16///                     int taskIndex, int taskCount,
17///                     int taskIndex0, int taskIndex1, int taskIndex2,
18///                     int taskCount0, int taskCount1, int taskCount2);
19/// ```
20pub type ISPCTaskFn = extern "C" fn(
21    data: *mut libc::c_void,
22    thread_idx: libc::c_int,
23    thread_cnt: libc::c_int,
24    task_idx: libc::c_int,
25    task_cnt: libc::c_int,
26    task_idx0: libc::c_int,
27    task_idx1: libc::c_int,
28    task_idx2: libc::c_int,
29    task_cnt0: libc::c_int,
30    task_cnt1: libc::c_int,
31    task_cnt2: libc::c_int,
32);
33
34/// A list of all task groups spawned by a function in some launch context which
35/// will be sync'd at an explicit `sync` call or function exit.
36///
37/// **Note:** A Context is done if and only if `ISPCSync` has been called with
38/// its handle and all of its tasks are finished. Until `ISPCSync` is called on the
39/// Context's handle more tasks could be launched.
40///
41/// Additionally, because we're not really able to associate a call to `ISPCAlloc`
42/// with a specific Group care must be taken that the Context is not dropped
43/// until `ISPCSync` has been called on its handle and all Groups within have
44/// completed execution.
45#[derive(Debug)]
46pub struct Context {
47    /// Task groups launched by this function
48    /// TODO: Must be protected by a Reader-Writer lock, though I don't think we'd want to
49    /// protect each Group, it'd be an `RwLock<Vec<Group>>`
50    /// PROBLEM: If we're accessing this from multiple threads and have other threads
51    /// working on the group when we want to push a new group on we'll get stuck until those
52    /// tasks finish because they'll have a read lock on the vec to access the Group safely.
53    /// I guess an easy fix would be to push groups behind Arcs? But then how would the
54    /// Chunk get the Arc?
55    tasks: RwLock<Vec<Arc<Group>>>,
56    /// The memory allocated for the various task group's parameters
57    mem: Mutex<Vec<(AtomicPtr<libc::c_void>, std::alloc::Layout)>>,
58    /// A unique identifier for this context
59    pub id: usize,
60}
61
62impl Context {
63    /// Create a new list of tasks for some function with id `id`
64    pub fn new(id: usize) -> Context {
65        Context {
66            tasks: RwLock::new(Vec::new()),
67            mem: Mutex::new(Vec::new()),
68            id,
69        }
70    }
71    /// Add a task group for execution that was launched in this context
72    pub fn launch(&self, total: (i32, i32, i32), data: *mut libc::c_void, fcn: ISPCTaskFn) {
73        self.tasks
74            .write()
75            .unwrap()
76            .push(Arc::new(Group::new(total, AtomicPtr::new(data), fcn)));
77    }
78    /// Check if all tasks currently in the task list are completed
79    ///
80    /// **Note:** A Context is done if and only if ISPCSync has been called with
81    /// its handle and all of its tasks are finished. Until ISPCSync is called on the
82    /// Context's handle more tasks could be launched.
83    /// TODO: With this design we're essentially requiring the thread waiting on the context
84    /// to busy wait since we provide no condition variable to block on.
85    pub fn current_tasks_done(&self) -> bool {
86        self.tasks.read().unwrap().iter().all(|t| t.is_finished())
87    }
88    /// Allocate some memory for this Context's task groups, returns a pointer to the allocated memory.
89    ///
90    /// # Safety
91    /// This function is unsafe as it is used to perform a raw memory allocation to be passed back
92    /// to ISPC
93    pub unsafe fn alloc(&self, size: usize, align: usize) -> *mut libc::c_void {
94        // TODO: The README for this lib mentions it may be slow. Maybe use some other allocator?
95        let layout = std::alloc::Layout::from_size_align(size, align)
96            .expect("std::alloc::Layout is invalid. Make sure the align is a power of 2");
97        let ptr = std::alloc::alloc(layout) as *mut libc::c_void;
98        let mut mem = self.mem.lock().unwrap();
99        mem.push((AtomicPtr::new(ptr), layout));
100        ptr
101    }
102    /// An iterator over the **current** groups in the context which have remaining tasks to
103    /// run on a thread. If more task groups are added before this iterator has returned
104    /// None those will appear as well.
105    pub fn iter(&self) -> ContextIter<'_> {
106        ContextIter { context: self }
107    }
108    /// Get a Group with tasks remaining to be executed, returns None if there
109    /// are no groups left to run in this context.
110    ///
111    /// Note that you can't assume that the Group you get back is guaranteed
112    /// to have tasks remaining since between the time of checking that the
113    /// group has outstanding tasks and getting the group back to call `chunks`
114    /// those remaining tasks may have been taken by another thread.
115    fn get_active_group(&self) -> Option<Arc<Group>> {
116        let tasks = self.tasks.read().unwrap();
117        for group in tasks.iter() {
118            if group.has_tasks() {
119                return Some(Arc::clone(group));
120            }
121        }
122        None
123    }
124}
125
126impl Drop for Context {
127    /// Release memory for all the tasks in this context
128    ///
129    /// **Note:** that because we're not really able to associate a call to ISPCAlloc
130    /// with a specific Group care must be taken that the Context is not dropped
131    /// until ISPCSync has been called on its handle and all Groups within have
132    /// completed execution.
133    fn drop(&mut self) {
134        let mut mem = self.mem.lock().unwrap();
135        for tup in mem.drain(0..) {
136            let ptr = tup.0;
137            let layout = tup.1;
138            let m = ptr.load(atomic::Ordering::SeqCst);
139            unsafe { std::alloc::dealloc(m as *mut u8, layout) };
140        }
141    }
142}
143
144/// An iterator over the **current** groups in the context which have remaining tasks to
145/// run on a thread. If more task groups are added before this iterator has returned
146/// None those will appear as well.
147pub struct ContextIter<'a> {
148    context: &'a Context,
149}
150
151impl Iterator for ContextIter<'_> {
152    type Item = Arc<Group>;
153
154    /// Get a Group with tasks remaining to be executed, returns None if there
155    /// are no groups left to run in this context.
156    ///
157    /// Note that you can't assume that the Group you get back is guaranteed
158    /// to have tasks remaining since between the time of checking that the
159    /// group has outstanding tasks and getting the group back to call `chunks`
160    /// those remaining tasks may have been taken by another thread.
161    fn next(&mut self) -> Option<Arc<Group>> {
162        self.context.get_active_group()
163    }
164}
165
166/// A group of tasks spawned by a call to `launch` in ISPC
167#[derive(Debug)]
168pub struct Group {
169    /// Current starting index to execute the remaining tasks in this group
170    /// TODO: Protect start by a mutex since it will be modified by `get_chunk`
171    /// which would get a chunk of tasks to be executed along with a copy of the
172    /// total, fcn ptr and data. This would be wrapped in to a struct, `Chunk` which
173    /// would expose next() and behave like an iterator to go through the chunk of tasks
174    /// and run them. Right now we just schedule tasks like in a nested for loop,
175    /// would some tiled scheduling be better?
176    start: AtomicUsize,
177    end: usize,
178    /// Total number of tasks scheduled in this group
179    pub total: (i32, i32, i32),
180    /// Function to run for this task
181    pub fcn: ISPCTaskFn,
182    /// Data pointer to user params to pass to the function
183    pub data: AtomicPtr<libc::c_void>,
184    /// Tracks how many chunks we've given out so far to threads
185    chunks_launched: AtomicUsize,
186    /// Tracks how many of the chunks we gave out are completed. A group is finished
187    /// only when all chunks are done and start >= total tasks, call `is_finished` to check.
188    ///
189    /// I'm unsure whether or semaphore/condvar would be the better choice here
190    /// The TASK_LIST would want to send an alert when new tasks are pushed so in
191    /// Sync we could wait on the context to finish?
192    /// TODO: We can't just have the last chunk executed mark the group as done
193    /// because earlier chunks might still be running! We need to mark ourselves
194    chunks_finished: AtomicUsize,
195}
196
197impl Group {
198    /// Create a new task group for execution of the function
199    pub fn new(total: (i32, i32, i32), data: AtomicPtr<libc::c_void>, fcn: ISPCTaskFn) -> Group {
200        Group {
201            start: AtomicUsize::new(0),
202            end: (total.0 * total.1 * total.2) as usize,
203            total,
204            data,
205            fcn,
206            chunks_launched: AtomicUsize::new(0),
207            chunks_finished: AtomicUsize::new(0),
208        }
209    }
210    /// Get an iterator over `chunk_size` chunks of tasks to be executed for this group
211    pub fn chunks(&self, chunk_size: usize) -> GroupChunks<'_> {
212        GroupChunks {
213            group: self,
214            chunk_size,
215        }
216    }
217    /// Check if all tasks for this group have been completed
218    pub fn is_finished(&self) -> bool {
219        let finished = self.chunks_finished.load(atomic::Ordering::SeqCst);
220        let launched = self.chunks_launched.load(atomic::Ordering::SeqCst);
221        let start = self.start.load(atomic::Ordering::SeqCst);
222        // This shouldn't happen, if it does some bad threading voodoo is afoot
223        assert!(finished <= launched);
224        finished == launched && start >= self.end
225    }
226    /// Check if this group has tasks left to execute
227    fn has_tasks(&self) -> bool {
228        let start = self.start.load(atomic::Ordering::SeqCst);
229        start < self.end
230    }
231    /// Get a chunk of tasks from the group to run if there are any tasks left to run
232    ///
233    /// `desired_tasks` specifies the number of tasks we'd like the chunk to contain,
234    /// though you may get fewer if there aren't that many tasks left. If the chunk
235    /// you get is the last chunk to be executed (`chunk.end == total.0 * total.1 * total.2`)
236    /// you must mark this group as finished upon completing execution of the chunk
237    fn get_chunk(&self, desired_tasks: usize) -> Option<Chunk<'_>> {
238        let start = self
239            .start
240            .fetch_add(desired_tasks, atomic::Ordering::SeqCst);
241        if start < self.end {
242            // Give the chunk 4 tasks or whatever remain
243            let c = Some(Chunk::new(
244                self,
245                start,
246                cmp::min(start + desired_tasks, self.end),
247            ));
248            self.chunks_launched.fetch_add(1, atomic::Ordering::SeqCst);
249            c
250        } else {
251            None
252        }
253    }
254}
255
256/// An iterator over chunks of tasks to be executed in a Group
257pub struct GroupChunks<'a> {
258    group: &'a Group,
259    chunk_size: usize,
260}
261
262impl<'a> Iterator for GroupChunks<'a> {
263    type Item = Chunk<'a>;
264
265    /// Get the next chunk of tasks to be executed
266    fn next(&mut self) -> Option<Chunk<'a>> {
267        self.group.get_chunk(self.chunk_size)
268    }
269}
270
271/// A chunk of tasks from a Group to be executed
272///
273/// Executes task in the range [start, end)
274#[derive(Debug)]
275pub struct Chunk<'a> {
276    /// The next task to be executed in this chunk
277    start: i32,
278    /// The last task to be executed in this chunk
279    end: i32,
280    /// Total number of tasks scheduled in the group this chunk came from
281    total: (i32, i32, i32),
282    /// Function to run for this task
283    fcn: ISPCTaskFn,
284    /// Data pointer to user params to pass to the function
285    data: AtomicPtr<libc::c_void>,
286    /// The group this chunk is running tasks from
287    group: &'a Group,
288}
289
290impl Chunk<'_> {
291    /// Create a new chunk to execute tasks in the group from [start, end)
292    pub fn new(group: &Group, start: usize, end: usize) -> Chunk<'_> {
293        let d = AtomicPtr::new(group.data.load(atomic::Ordering::SeqCst));
294        Chunk {
295            start: start as i32,
296            end: end as i32,
297            total: group.total,
298            fcn: group.fcn,
299            data: d,
300            group,
301        }
302    }
303    /// Execute all tasks in this chunk
304    pub fn execute(&self, thread_id: i32, total_threads: i32) {
305        let total_tasks = self.total.0 * self.total.1 * self.total.2;
306        let data = self.data.load(atomic::Ordering::SeqCst);
307        for t in self.start..self.end {
308            let id = self.task_indices(t);
309            (self.fcn)(
310                data,
311                thread_id as libc::c_int,
312                total_threads as libc::c_int,
313                t as libc::c_int,
314                total_tasks as libc::c_int,
315                id.0 as libc::c_int,
316                id.1 as libc::c_int,
317                id.2 as libc::c_int,
318                self.total.0 as libc::c_int,
319                self.total.1 as libc::c_int,
320                self.total.2 as libc::c_int,
321            );
322        }
323        // Tell the group this chunk is done
324        self.group
325            .chunks_finished
326            .fetch_add(1, atomic::Ordering::SeqCst);
327    }
328    /// Get the global task id for the task index
329    fn task_indices(&self, id: i32) -> (i32, i32, i32) {
330        (
331            id % self.total.0,
332            (id / self.total.0) % self.total.1,
333            id / (self.total.0 * self.total.1),
334        )
335    }
336}