ocl_extras/
command_graph.rs

1//! A command requisite-dependency graph.
2//!
3//! ### Queues and Deadlocks
4//!
5//! When offloading host-side processing to a thread pool or foreign thread, a
6//! few important guidelines are important to keep in mind in order to avoid
7//! deadlocks. This usually applies to unmap or write commands but could
8//! equally apply to any other command which triggers a delayed-user-callback
9//! chain.
10//!
11//! Let's say we have a host-mapped buffer (`MemMap`) which needs to write
12//! data to that mapped area and then enqueue an unmap moving the data to a
13//! device... [TODO: FINISH THIS]
14//!
15//! When an offloaded task completes, it will inevitably enqueue another
16//! command or trigger some other action which will affect commands which are
17//! **already** in a command queue.
18//!
19//!
20
21use std::cell::{Cell, RefCell, Ref};
22use std::collections::{HashMap, BTreeSet};
23use ocl::{Event, EventList};
24
25pub struct RwCmdIdxs {
26    writers: Vec<usize>,
27    readers: Vec<usize>,
28}
29
30impl RwCmdIdxs {
31    fn new() -> RwCmdIdxs {
32        RwCmdIdxs { writers: Vec::new(), readers: Vec::new() }
33    }
34}
35
36#[allow(dead_code)]
37pub struct KernelArgBuffer {
38    arg_idx: usize, // Will be used when refreshing kernels after defragging or resizing.
39    buffer_id: usize,
40}
41
42impl KernelArgBuffer {
43    pub fn new(arg_idx: usize, buffer_id: usize) -> KernelArgBuffer {
44        KernelArgBuffer { arg_idx: arg_idx, buffer_id: buffer_id }
45    }
46}
47
48
49/// Details of a queuable command.
50pub enum CommandDetails {
51    Fill { target: usize },
52    Write { target: usize },
53    Read { source: usize },
54    Copy { source: usize, target: usize },
55    Kernel { id: usize, sources: Vec<KernelArgBuffer>, targets: Vec<KernelArgBuffer> },
56}
57
58impl CommandDetails {
59    pub fn sources(&self) -> Vec<usize> {
60        match *self {
61            CommandDetails::Fill { .. } => vec![],
62            CommandDetails::Read { source } => vec![source],
63            CommandDetails::Write { .. } => vec![],
64            CommandDetails::Copy { source, .. } => vec![source],
65            CommandDetails::Kernel { ref sources, .. } => {
66                sources.iter().map(|arg| arg.buffer_id).collect()
67            },
68        }
69    }
70
71    pub fn targets(&self) -> Vec<usize> {
72        match *self {
73            CommandDetails::Fill { target } => vec![target],
74            CommandDetails::Read { .. } => vec![],
75            CommandDetails::Write { target } => vec![target],
76            CommandDetails::Copy { target, .. } => vec![target],
77            CommandDetails::Kernel { ref targets, .. } => {
78                targets.iter().map(|arg| arg.buffer_id).collect()
79            },
80        }
81    }
82}
83
84
85pub struct Command {
86    details: CommandDetails,
87    event: RefCell<Option<Event>>,
88    requisite_events: RefCell<EventList>,
89}
90
91impl Command {
92    pub fn new(details: CommandDetails) -> Command {
93        Command {
94            details: details,
95            event: RefCell::new(None),
96            requisite_events: RefCell::new(EventList::new()),
97        }
98    }
99
100    /// Returns a list of commands which both precede a command and which
101    /// write to a block of memory which is read from by that command.
102    pub fn preceding_writers(&self, cmds: &HashMap<usize, RwCmdIdxs>) -> BTreeSet<usize> {
103        self.details.sources().iter().flat_map(|cmd_src_block|
104            cmds.get(cmd_src_block).unwrap().writers.iter().cloned()).collect()
105    }
106
107    /// Returns a list of commands which both follow a command and which read
108    /// from a block of memory which is written to by that command.
109    pub fn following_readers(&self, cmds: &HashMap<usize, RwCmdIdxs>) -> BTreeSet<usize> {
110        self.details.targets().iter().flat_map(|cmd_tar_block|
111            cmds.get(cmd_tar_block).unwrap().readers.iter().cloned()).collect()
112    }
113
114    pub fn details(&self) -> &CommandDetails { &self.details }
115}
116
117
118/// A directional sequence dependency graph representing the temporal
119/// requirements of each asynchronous read, write, copy, and kernel (commands)
120/// for a particular task.
121///
122/// Obviously this is an overkill for this example but this graph is flexible
123/// enough to schedule execution correctly and optimally with arbitrarily many
124/// parallel tasks with arbitrary duration reads, writes and kernels.
125///
126/// Note that in this example we are using `buffer_id` a `usize` to represent
127/// memory regions (because that's what the allocator above is using) but we
128/// could easily use multiple part, complex identifiers/keys. For example, we
129/// may have a program with a large number of buffers which are organized into
130/// a complex hierarchy or some other arbitrary structure. We could swap
131/// `buffer_id` for some value which represented that as long as the
132/// identifier we used could uniquely identify each subsection of memory. We
133/// could also use ranges of values and do an overlap check and have
134/// byte-level precision.
135///
136pub struct CommandGraph {
137    commands: Vec<Command>,
138    command_requisites: Vec<Vec<usize>>,
139    ends: (Vec<usize>, Vec<usize>),
140    locked: bool,
141    next_cmd_idx: Cell<usize>,
142}
143
144impl CommandGraph {
145    /// Returns a new, empty graph.
146    pub fn new() -> CommandGraph {
147        CommandGraph {
148            commands: Vec::new(),
149            command_requisites: Vec::new(),
150            ends: (Vec::new(), Vec::new()),
151            locked: false,
152            next_cmd_idx: Cell::new(0),
153        }
154    }
155
156    /// Adds a new command and returns the command index if successful.
157    pub fn add(&mut self, command: Command) -> Result<usize, ()> {
158        if self.locked { return Err(()); }
159        self.commands.push(command);
160        self.command_requisites.push(Vec::new());
161        Ok(self.commands.len() - 1)
162    }
163
164    /// Returns a sub-buffer map which contains every command that reads from
165    /// or writes to each sub-buffer.
166    fn readers_and_writers_by_buffer(&self) -> HashMap<usize, RwCmdIdxs> {
167        let mut cmds = HashMap::new();
168
169        for (cmd_idx, cmd) in self.commands.iter().enumerate() {
170            for cmd_src_block in cmd.details.sources().into_iter() {
171                let rw_cmd_idxs = cmds.entry(cmd_src_block.clone())
172                    .or_insert(RwCmdIdxs::new());
173
174                rw_cmd_idxs.readers.push(cmd_idx);
175            }
176
177            for cmd_tar_block in cmd.details.targets().into_iter() {
178                let rw_cmd_idxs = cmds.entry(cmd_tar_block.clone())
179                    .or_insert(RwCmdIdxs::new());
180
181                rw_cmd_idxs.writers.push(cmd_idx);
182            }
183        }
184
185        cmds
186    }
187
188    /// Populates the list of requisite commands necessary for each command.
189    ///
190    /// Requisite commands (preceding writers and following readers) for a
191    /// command are those which are causally linked and must come either
192    /// directly before or after. By determining whether or not a command
193    /// comes directly before or after another we can determine the
194    /// causal/temporal relationship between any two nodes on the graph.
195    ///
196    /// Nodes without any preceding writers or following readers are start or
197    /// finish endpoints respectively. It's possible for a graph to have no
198    /// endpoints, in which case the graph is closed and at least partially
199    /// cyclical.
200    ///
201    pub fn populate_requisites(&mut self) {
202        let cmds = self.readers_and_writers_by_buffer();
203
204        for (cmd_idx, cmd) in self.commands.iter_mut().enumerate() {
205            assert!(self.command_requisites[cmd_idx].is_empty());
206
207            // Get all commands which must precede the current `cmd`.
208            let preceding_writers = cmd.preceding_writers(&cmds);
209
210            // If there are none, `cmd` is a start endpoint.
211            if preceding_writers.len() == 0 { self.ends.0.push(cmd_idx); }
212
213            // Otherwise add them to the list of requisites.
214            for &req_cmd_idx in preceding_writers.iter() {
215                self.command_requisites[cmd_idx].push(req_cmd_idx);
216            }
217
218            // Get all commands which must follow the current `cmd`.
219            let following_readers = cmd.following_readers(&cmds);
220
221            // If there are none, `cmd` is a finish endpoint.
222            if following_readers.len() == 0 { self.ends.1.push(cmd_idx); }
223
224            // Otherwise add them to the list of requisites.
225            for &req_cmd_idx in following_readers.iter() {
226                self.command_requisites[cmd_idx].push(req_cmd_idx);
227            }
228
229            self.command_requisites[cmd_idx].shrink_to_fit();
230        }
231
232        self.commands.shrink_to_fit();
233        self.command_requisites.shrink_to_fit();
234        self.locked = true;
235    }
236
237    /// Returns the list of requisite events for a command.
238    pub fn get_req_events(&self, cmd_idx: usize) -> Result<Ref<EventList>, &'static str> {
239        if !self.locked { return Err("Call '::populate_requisites' first."); }
240        if self.next_cmd_idx.get() != cmd_idx { return Err("Command events requested out of order."); }
241
242        self.commands.get(cmd_idx).unwrap().requisite_events.borrow_mut().clear();
243
244        for &req_idx in self.command_requisites[cmd_idx].iter() {
245            let event_opt = self.commands[req_idx].event.borrow().clone();
246
247            if let Some(event) = event_opt {
248                self.commands[cmd_idx].requisite_events.borrow_mut().push(event);
249            }
250        }
251
252        Ok(self.commands[cmd_idx].requisite_events.borrow())
253    }
254
255    /// Sets the event associated with the completion of a command.
256    pub fn set_cmd_event(&self, cmd_idx: usize, event: Event) -> Result<(), &'static str> {
257        if !self.locked { return Err("Call '::populate_requisites' first."); }
258
259        // let event_opt = self.commands[req_idx].event.borrow();
260
261        *self.commands.get(cmd_idx).unwrap().event.borrow_mut() = Some(event);
262
263        if (self.next_cmd_idx.get() + 1) == self.commands.len() {
264            self.next_cmd_idx.set(0);
265        } else {
266            // self.next_cmd_idx += 1;
267            self.next_cmd_idx.set(self.next_cmd_idx.get() + 1);
268        }
269
270        Ok(())
271    }
272
273    pub fn commands<'a>(&'a self) -> &'a [Command] {
274        self.commands.as_slice()
275    }
276
277    pub fn get_finish_events (&self, event_list: &mut EventList) {
278        assert!(self.next_cmd_idx.get() == 0, "Finish events can only be determined \
279            for each cycle just after the graph has set its last cmd event.");
280
281        for &cmd_idx in self.ends.1.iter() {
282            let event_opt = self.commands[cmd_idx].event.borrow().clone();
283
284            if let Some(event) = event_opt {
285                event_list.push(event);
286            }
287        }
288    }
289}