ispc_rt/task.rs
1//! Defines structs for operating on ISPC task groups and getting chunks
2//! of a task to be scheduled on to threads
3
4use libc;
5
6use std::cmp;
7use std::iter::Iterator;
8use std::sync::atomic::{self, AtomicPtr, AtomicUsize};
9use std::sync::{Arc, Mutex, RwLock};
10
11/// A pointer to an ISPC task function.
12///
13/// The ISPC task function pointer is:
14/// ```c
15/// void (*TaskFuncPtr)(void *data, int threadIndex, int threadCount,
16/// int taskIndex, int taskCount,
17/// int taskIndex0, int taskIndex1, int taskIndex2,
18/// int taskCount0, int taskCount1, int taskCount2);
19/// ```
20pub type ISPCTaskFn = extern "C" fn(
21 data: *mut libc::c_void,
22 thread_idx: libc::c_int,
23 thread_cnt: libc::c_int,
24 task_idx: libc::c_int,
25 task_cnt: libc::c_int,
26 task_idx0: libc::c_int,
27 task_idx1: libc::c_int,
28 task_idx2: libc::c_int,
29 task_cnt0: libc::c_int,
30 task_cnt1: libc::c_int,
31 task_cnt2: libc::c_int,
32);
33
34/// A list of all task groups spawned by a function in some launch context which
35/// will be sync'd at an explicit `sync` call or function exit.
36///
37/// **Note:** A Context is done if and only if `ISPCSync` has been called with
38/// its handle and all of its tasks are finished. Until `ISPCSync` is called on the
39/// Context's handle more tasks could be launched.
40///
41/// Additionally, because we're not really able to associate a call to `ISPCAlloc`
42/// with a specific Group care must be taken that the Context is not dropped
43/// until `ISPCSync` has been called on its handle and all Groups within have
44/// completed execution.
45#[derive(Debug)]
46pub struct Context {
47 /// Task groups launched by this function
48 /// TODO: Must be protected by a Reader-Writer lock, though I don't think we'd want to
49 /// protect each Group, it'd be an `RwLock<Vec<Group>>`
50 /// PROBLEM: If we're accessing this from multiple threads and have other threads
51 /// working on the group when we want to push a new group on we'll get stuck until those
52 /// tasks finish because they'll have a read lock on the vec to access the Group safely.
53 /// I guess an easy fix would be to push groups behind Arcs? But then how would the
54 /// Chunk get the Arc?
55 tasks: RwLock<Vec<Arc<Group>>>,
56 /// The memory allocated for the various task group's parameters
57 mem: Mutex<Vec<(AtomicPtr<libc::c_void>, std::alloc::Layout)>>,
58 /// A unique identifier for this context
59 pub id: usize,
60}
61
62impl Context {
63 /// Create a new list of tasks for some function with id `id`
64 pub fn new(id: usize) -> Context {
65 Context {
66 tasks: RwLock::new(Vec::new()),
67 mem: Mutex::new(Vec::new()),
68 id,
69 }
70 }
71 /// Add a task group for execution that was launched in this context
72 pub fn launch(&self, total: (i32, i32, i32), data: *mut libc::c_void, fcn: ISPCTaskFn) {
73 self.tasks
74 .write()
75 .unwrap()
76 .push(Arc::new(Group::new(total, AtomicPtr::new(data), fcn)));
77 }
78 /// Check if all tasks currently in the task list are completed
79 ///
80 /// **Note:** A Context is done if and only if ISPCSync has been called with
81 /// its handle and all of its tasks are finished. Until ISPCSync is called on the
82 /// Context's handle more tasks could be launched.
83 /// TODO: With this design we're essentially requiring the thread waiting on the context
84 /// to busy wait since we provide no condition variable to block on.
85 pub fn current_tasks_done(&self) -> bool {
86 self.tasks.read().unwrap().iter().all(|t| t.is_finished())
87 }
88 /// Allocate some memory for this Context's task groups, returns a pointer to the allocated memory.
89 ///
90 /// # Safety
91 /// This function is unsafe as it is used to perform a raw memory allocation to be passed back
92 /// to ISPC
93 pub unsafe fn alloc(&self, size: usize, align: usize) -> *mut libc::c_void {
94 // TODO: The README for this lib mentions it may be slow. Maybe use some other allocator?
95 let layout = std::alloc::Layout::from_size_align(size, align)
96 .expect("std::alloc::Layout is invalid. Make sure the align is a power of 2");
97 let ptr = std::alloc::alloc(layout) as *mut libc::c_void;
98 let mut mem = self.mem.lock().unwrap();
99 mem.push((AtomicPtr::new(ptr), layout));
100 ptr
101 }
102 /// An iterator over the **current** groups in the context which have remaining tasks to
103 /// run on a thread. If more task groups are added before this iterator has returned
104 /// None those will appear as well.
105 pub fn iter(&self) -> ContextIter<'_> {
106 ContextIter { context: self }
107 }
108 /// Get a Group with tasks remaining to be executed, returns None if there
109 /// are no groups left to run in this context.
110 ///
111 /// Note that you can't assume that the Group you get back is guaranteed
112 /// to have tasks remaining since between the time of checking that the
113 /// group has outstanding tasks and getting the group back to call `chunks`
114 /// those remaining tasks may have been taken by another thread.
115 fn get_active_group(&self) -> Option<Arc<Group>> {
116 let tasks = self.tasks.read().unwrap();
117 for group in tasks.iter() {
118 if group.has_tasks() {
119 return Some(Arc::clone(group));
120 }
121 }
122 None
123 }
124}
125
126impl Drop for Context {
127 /// Release memory for all the tasks in this context
128 ///
129 /// **Note:** that because we're not really able to associate a call to ISPCAlloc
130 /// with a specific Group care must be taken that the Context is not dropped
131 /// until ISPCSync has been called on its handle and all Groups within have
132 /// completed execution.
133 fn drop(&mut self) {
134 let mut mem = self.mem.lock().unwrap();
135 for tup in mem.drain(0..) {
136 let ptr = tup.0;
137 let layout = tup.1;
138 let m = ptr.load(atomic::Ordering::SeqCst);
139 unsafe { std::alloc::dealloc(m as *mut u8, layout) };
140 }
141 }
142}
143
144/// An iterator over the **current** groups in the context which have remaining tasks to
145/// run on a thread. If more task groups are added before this iterator has returned
146/// None those will appear as well.
147pub struct ContextIter<'a> {
148 context: &'a Context,
149}
150
151impl Iterator for ContextIter<'_> {
152 type Item = Arc<Group>;
153
154 /// Get a Group with tasks remaining to be executed, returns None if there
155 /// are no groups left to run in this context.
156 ///
157 /// Note that you can't assume that the Group you get back is guaranteed
158 /// to have tasks remaining since between the time of checking that the
159 /// group has outstanding tasks and getting the group back to call `chunks`
160 /// those remaining tasks may have been taken by another thread.
161 fn next(&mut self) -> Option<Arc<Group>> {
162 self.context.get_active_group()
163 }
164}
165
166/// A group of tasks spawned by a call to `launch` in ISPC
167#[derive(Debug)]
168pub struct Group {
169 /// Current starting index to execute the remaining tasks in this group
170 /// TODO: Protect start by a mutex since it will be modified by `get_chunk`
171 /// which would get a chunk of tasks to be executed along with a copy of the
172 /// total, fcn ptr and data. This would be wrapped in to a struct, `Chunk` which
173 /// would expose next() and behave like an iterator to go through the chunk of tasks
174 /// and run them. Right now we just schedule tasks like in a nested for loop,
175 /// would some tiled scheduling be better?
176 start: AtomicUsize,
177 end: usize,
178 /// Total number of tasks scheduled in this group
179 pub total: (i32, i32, i32),
180 /// Function to run for this task
181 pub fcn: ISPCTaskFn,
182 /// Data pointer to user params to pass to the function
183 pub data: AtomicPtr<libc::c_void>,
184 /// Tracks how many chunks we've given out so far to threads
185 chunks_launched: AtomicUsize,
186 /// Tracks how many of the chunks we gave out are completed. A group is finished
187 /// only when all chunks are done and start >= total tasks, call `is_finished` to check.
188 ///
189 /// I'm unsure whether or semaphore/condvar would be the better choice here
190 /// The TASK_LIST would want to send an alert when new tasks are pushed so in
191 /// Sync we could wait on the context to finish?
192 /// TODO: We can't just have the last chunk executed mark the group as done
193 /// because earlier chunks might still be running! We need to mark ourselves
194 chunks_finished: AtomicUsize,
195}
196
197impl Group {
198 /// Create a new task group for execution of the function
199 pub fn new(total: (i32, i32, i32), data: AtomicPtr<libc::c_void>, fcn: ISPCTaskFn) -> Group {
200 Group {
201 start: AtomicUsize::new(0),
202 end: (total.0 * total.1 * total.2) as usize,
203 total,
204 data,
205 fcn,
206 chunks_launched: AtomicUsize::new(0),
207 chunks_finished: AtomicUsize::new(0),
208 }
209 }
210 /// Get an iterator over `chunk_size` chunks of tasks to be executed for this group
211 pub fn chunks(&self, chunk_size: usize) -> GroupChunks<'_> {
212 GroupChunks {
213 group: self,
214 chunk_size,
215 }
216 }
217 /// Check if all tasks for this group have been completed
218 pub fn is_finished(&self) -> bool {
219 let finished = self.chunks_finished.load(atomic::Ordering::SeqCst);
220 let launched = self.chunks_launched.load(atomic::Ordering::SeqCst);
221 let start = self.start.load(atomic::Ordering::SeqCst);
222 // This shouldn't happen, if it does some bad threading voodoo is afoot
223 assert!(finished <= launched);
224 finished == launched && start >= self.end
225 }
226 /// Check if this group has tasks left to execute
227 fn has_tasks(&self) -> bool {
228 let start = self.start.load(atomic::Ordering::SeqCst);
229 start < self.end
230 }
231 /// Get a chunk of tasks from the group to run if there are any tasks left to run
232 ///
233 /// `desired_tasks` specifies the number of tasks we'd like the chunk to contain,
234 /// though you may get fewer if there aren't that many tasks left. If the chunk
235 /// you get is the last chunk to be executed (`chunk.end == total.0 * total.1 * total.2`)
236 /// you must mark this group as finished upon completing execution of the chunk
237 fn get_chunk(&self, desired_tasks: usize) -> Option<Chunk<'_>> {
238 let start = self
239 .start
240 .fetch_add(desired_tasks, atomic::Ordering::SeqCst);
241 if start < self.end {
242 // Give the chunk 4 tasks or whatever remain
243 let c = Some(Chunk::new(
244 self,
245 start,
246 cmp::min(start + desired_tasks, self.end),
247 ));
248 self.chunks_launched.fetch_add(1, atomic::Ordering::SeqCst);
249 c
250 } else {
251 None
252 }
253 }
254}
255
256/// An iterator over chunks of tasks to be executed in a Group
257pub struct GroupChunks<'a> {
258 group: &'a Group,
259 chunk_size: usize,
260}
261
262impl<'a> Iterator for GroupChunks<'a> {
263 type Item = Chunk<'a>;
264
265 /// Get the next chunk of tasks to be executed
266 fn next(&mut self) -> Option<Chunk<'a>> {
267 self.group.get_chunk(self.chunk_size)
268 }
269}
270
271/// A chunk of tasks from a Group to be executed
272///
273/// Executes task in the range [start, end)
274#[derive(Debug)]
275pub struct Chunk<'a> {
276 /// The next task to be executed in this chunk
277 start: i32,
278 /// The last task to be executed in this chunk
279 end: i32,
280 /// Total number of tasks scheduled in the group this chunk came from
281 total: (i32, i32, i32),
282 /// Function to run for this task
283 fcn: ISPCTaskFn,
284 /// Data pointer to user params to pass to the function
285 data: AtomicPtr<libc::c_void>,
286 /// The group this chunk is running tasks from
287 group: &'a Group,
288}
289
290impl Chunk<'_> {
291 /// Create a new chunk to execute tasks in the group from [start, end)
292 pub fn new(group: &Group, start: usize, end: usize) -> Chunk<'_> {
293 let d = AtomicPtr::new(group.data.load(atomic::Ordering::SeqCst));
294 Chunk {
295 start: start as i32,
296 end: end as i32,
297 total: group.total,
298 fcn: group.fcn,
299 data: d,
300 group,
301 }
302 }
303 /// Execute all tasks in this chunk
304 pub fn execute(&self, thread_id: i32, total_threads: i32) {
305 let total_tasks = self.total.0 * self.total.1 * self.total.2;
306 let data = self.data.load(atomic::Ordering::SeqCst);
307 for t in self.start..self.end {
308 let id = self.task_indices(t);
309 (self.fcn)(
310 data,
311 thread_id as libc::c_int,
312 total_threads as libc::c_int,
313 t as libc::c_int,
314 total_tasks as libc::c_int,
315 id.0 as libc::c_int,
316 id.1 as libc::c_int,
317 id.2 as libc::c_int,
318 self.total.0 as libc::c_int,
319 self.total.1 as libc::c_int,
320 self.total.2 as libc::c_int,
321 );
322 }
323 // Tell the group this chunk is done
324 self.group
325 .chunks_finished
326 .fetch_add(1, atomic::Ordering::SeqCst);
327 }
328 /// Get the global task id for the task index
329 fn task_indices(&self, id: i32) -> (i32, i32, i32) {
330 (
331 id % self.total.0,
332 (id / self.total.0) % self.total.1,
333 id / (self.total.0 * self.total.1),
334 )
335 }
336}