open_cl_low_level/
event.rs

1use std::mem::ManuallyDrop;
2use std::time::Duration;
3
4use crate::{
5    build_output, ClCommandQueue, ClContext, ClNumber, ClPointer, CommandExecutionStatus,
6    Error, EventInfo, Output, ProfilingInfo, Waitlist, ObjectWrapper,
7};
8
9use crate::ffi::{
10    clGetEventInfo, clGetEventProfilingInfo, cl_command_queue, cl_context, cl_event, cl_event_info,
11    cl_profiling_info, cl_ulong,
12};
13
14use crate::cl_helpers::cl_get_info5;
15
16// NOTE: Fix cl_profiling_info arg // should be a bitflag or enum.
17pub unsafe fn cl_get_event_profiling_info(
18    event: cl_event,
19    info_flag: cl_profiling_info,
20) -> Output<u64> {
21    let mut time: cl_ulong = 0;
22    let err_code = clGetEventProfilingInfo(
23        event,
24        info_flag,
25        std::mem::size_of::<cl_ulong>() as libc::size_t,
26        (&mut time as *mut u64) as *mut libc::c_void,
27        std::ptr::null_mut(),
28    );
29    build_output(time as u64, err_code)
30}
31
32pub unsafe fn cl_get_event_info<T: Copy>(
33    event: cl_event,
34    info_flag: cl_event_info,
35) -> Output<ClPointer<T>> {
36    cl_get_info5(event, info_flag, clGetEventInfo)
37}
38
39pub type ClEvent = ObjectWrapper<cl_event>;
40
41pub unsafe trait EventPtr: Sized {
42    unsafe fn event_ptr(&self) -> cl_event;
43}
44
45unsafe impl EventPtr for cl_event {
46    unsafe fn event_ptr(&self) -> cl_event {
47        *self
48    }
49}
50
51unsafe impl EventPtr for ClEvent {
52    unsafe fn event_ptr(&self) -> cl_event {
53        self.cl_object()
54    }
55}
56
57unsafe impl EventPtr for &ClEvent {
58    unsafe fn event_ptr(&self) -> cl_event {
59        self.cl_object()
60    }
61}
62
63/// An error related to an Event or WaitList.
64#[derive(Debug, Fail, PartialEq, Eq, Clone)]
65pub enum EventError {
66    #[fail(display = "Encountered a null cl_event.")]
67    ClEventCannotBeNull,
68
69    #[fail(display = "Event was already consumed. {:?}", _0)]
70    EventAlreadyConsumed(String),
71}
72
73
74impl ClEvent {
75    pub fn time(&self, info: ProfilingInfo) -> Output<u64> {
76        unsafe { cl_get_event_profiling_info(self.event_ptr(), info.into()) }
77    }
78
79    pub fn queue_time(&self) -> Output<u64> {
80        self.time(ProfilingInfo::Queued)
81    }
82
83    pub fn submit_time(&self) -> Output<u64> {
84        self.time(ProfilingInfo::Submit)
85    }
86
87    pub fn start_time(&self) -> Output<u64> {
88        self.time(ProfilingInfo::Start)
89    }
90
91    pub fn end_time(&self) -> Output<u64> {
92        self.time(ProfilingInfo::End)
93    }
94
95    pub fn profiling(&self) -> Profiling {
96        Profiling {
97            submit_time: self.submit_time().ok(),
98            queue_time: self.queue_time().ok(),
99            start_time: self.start_time().ok(),
100            end_time: self.end_time().ok(),
101        }
102    }
103
104    pub unsafe fn info<T: Copy>(&self, flag: EventInfo) -> Output<ClPointer<T>> {
105        cl_get_event_info::<T>(self.event_ptr(), flag.into())
106    }
107
108    pub fn reference_count(&self) -> Output<u32> {
109        unsafe {
110            self.info(EventInfo::ReferenceCount)
111                .map(|ret| ret.into_one())
112        }
113    }
114
115    pub unsafe fn cl_command_queue(&self) -> Output<cl_command_queue> {
116        self.info(EventInfo::CommandQueue)
117            .map(|cl_ptr| cl_ptr.into_one())
118    }
119
120    pub unsafe fn command_queue(&self) -> Output<ClCommandQueue> {
121        self.cl_command_queue()
122            .and_then(|cq| ClCommandQueue::retain_new(cq))
123    }
124
125    pub unsafe fn cl_context(&self) -> Output<cl_context> {
126        self.info(EventInfo::Context)
127            .map(|cl_ptr| cl_ptr.into_one())
128    }
129
130    pub unsafe fn context(&self) -> Output<ClContext> {
131        self.cl_context().and_then(|ctx| ClContext::retain_new(ctx))
132    }
133
134    pub fn command_execution_status(&self) -> Output<CommandExecutionStatus> {
135        unsafe {
136            self.info(EventInfo::CommandExecutionStatus)
137                .map(|ret| ret.into_one())
138        }
139    }
140}
141
142pub struct Profiling {
143    pub queue_time: Option<u64>,
144    pub submit_time: Option<u64>,
145    pub start_time: Option<u64>,
146    pub end_time: Option<u64>,
147}
148
149impl Profiling {
150    pub fn total_duration(&self) -> Option<Duration> {
151        Some(Duration::from_nanos(self.end_time? - self.queue_time?))
152    }
153
154    pub fn duration_waiting_in_queue(&self) -> Option<Duration> {
155        Some(Duration::from_nanos(self.submit_time? - self.queue_time?))
156    }
157
158    pub fn duration_between_submit_and_start(&self) -> Option<Duration> {
159        Some(Duration::from_nanos(self.submit_time? - self.queue_time?))
160    }
161    pub fn duration_of_execution(&self) -> Option<Duration> {
162        Some(Duration::from_nanos(self.submit_time? - self.queue_time?))
163    }
164}
165
166pub struct BufferReadEvent<T: ClNumber> {
167    event: ManuallyDrop<ClEvent>,
168    host_buffer: ManuallyDrop<Option<Vec<T>>>,
169    is_consumed: bool,
170}
171
172impl<T: ClNumber> BufferReadEvent<T> {
173    pub fn new(event: ClEvent, host_buffer: Option<Vec<T>>) -> BufferReadEvent<T> {
174        BufferReadEvent {
175            event: ManuallyDrop::new(event),
176            host_buffer: ManuallyDrop::new(host_buffer),
177            is_consumed: false,
178        }
179    }
180
181    pub fn wait(&mut self) -> Output<Option<Vec<T>>> {
182        if self.is_consumed {
183            return Err(Error::EventError(EventError::EventAlreadyConsumed(
184                self.event.address(),
185            )));
186        }
187        unsafe {
188            self.event.wait()?;
189            match *self.host_buffer {
190                Some(_) => {
191                    let mut output = Some(vec![]);
192                    std::mem::swap(&mut *self.host_buffer, &mut output);
193                    self.is_consumed = true;
194                    Ok(output)
195                }
196                None => Ok(None),
197            }
198        }
199    }
200}
201
202impl<T: ClNumber> Drop for BufferReadEvent<T> {
203    fn drop(&mut self) {
204        unsafe {
205            self.event.wait().unwrap();
206            ManuallyDrop::drop(&mut self.event);
207            ManuallyDrop::drop(&mut self.host_buffer);
208        }
209    }
210}
211
212/// A CompleteEvent is the result of making a synchronous ffi call.
213///
214/// After the `cl_event`'s event is over the event is no longer able
215///
216/// A CompleteEvent is not for putting into WaitList.
217///
218/// Don't do it. You'll segfault.
219pub struct CompleteEvent {
220    event: ClEvent,
221    _unconstructable: (),
222}
223
224impl CompleteEvent {
225    pub fn new(event: ClEvent) -> CompleteEvent {
226        CompleteEvent {
227            event,
228            _unconstructable: (),
229        }
230    }
231
232    pub unsafe fn inner(&self) -> &ClEvent {
233        &self.event
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use crate::{
240        BufferCreator, ClCommandQueue, ClContext, ClEvent, ClKernel, ClMem, CommandExecutionStatus,
241        Session, SessionBuilder, Work,
242    };
243    use std::time::Duration;
244
245    const SRC: &'static str = "
246    __kernel void add_one(__global uint *data) {
247        data[get_global_id(0)] += 1;
248    }
249    ";
250
251    fn get_event() -> (Session, ClEvent) {
252        unsafe {
253            let mut session: Session = SessionBuilder::new().with_program_src(SRC).build().unwrap();
254            let mut kernel =
255                ClKernel::create(session.program(), "add_one").expect("Failed to Kernel::create/2");
256            let input_data: Vec<u64> = vec![1, 2, 3];
257            let data = &input_data[..];
258            let mem_cfg = data.mem_config();
259            let mut mem_buffer: ClMem =
260                ClMem::create_with_config(session.context(), data, mem_cfg)
261                    .unwrap_or_else(|e| panic!("Failed to ClMem::create_with_config() {:?}", e));
262            let () = kernel
263                .set_arg(0, &mut mem_buffer)
264                .expect("Failed to set_arg(0, &mem_buffer)");
265            let work = Work::new(input_data.len());
266            let event = session
267                .enqueue_kernel(0, &mut kernel, &work, None)
268                .unwrap_or_else(|e| panic!("Failed to sync_enqueue_kernel {:?}", e));
269            (session, event)
270        }
271    }
272
273    #[test]
274    fn event_method_queue_time_works() {
275        let (_sess, event) = get_event();
276        let output = event
277            .queue_time()
278            .expect("Failed to call event.queue_time()");
279        assert!(output > 0);
280    }
281
282    #[test]
283    fn event_method_submit_time_works() {
284        let (_sess, event) = get_event();
285        let output = event
286            .submit_time()
287            .expect("Failed to call event.submit_time()");
288        assert!(output > 0);
289    }
290
291    #[test]
292    fn event_method_start_time_works() {
293        let (_sess, event) = get_event();
294        let output = event
295            .start_time()
296            .expect("Failed to call event.start_time()");
297        assert!(output > 0);
298    }
299
300    #[test]
301    fn event_method_end_time_works() {
302        let (_sess, event) = get_event();
303        let output = event.end_time().expect("Failed to call event.end_time()");
304        assert!(output > 0);
305    }
306
307    #[test]
308    fn event_method_reference_count_works() {
309        let (_sess, event) = get_event();
310        let output = event
311            .reference_count()
312            .expect("Failed to call event.reference_count()");
313        assert_eq!(output, 1);
314    }
315
316    #[test]
317    fn event_method_command_queue_works() {
318        let (_sess, event) = get_event();
319        let _output: ClCommandQueue =
320            unsafe { event.command_queue() }.expect("Failed to call event.command_queue()");
321    }
322
323    #[test]
324    fn event_method_context_works() {
325        let (_sess, event) = get_event();
326        let _output: ClContext =
327            unsafe { event.context() }.expect("Failed to call event.context()");
328    }
329
330    #[test]
331    fn event_method_command_execution_status_works() {
332        let (_sess, event) = get_event();
333        let _output: CommandExecutionStatus = event
334            .command_execution_status()
335            .expect("Failed to call event.command_exection_status()");
336    }
337
338    #[test]
339    fn event_profiling_works() {
340        let (_sess, event) = get_event();
341        let exec_status: CommandExecutionStatus = event
342            .command_execution_status()
343            .expect("Failed to call event.command_exection_status()");
344        assert_eq!(exec_status, CommandExecutionStatus::Complete);
345        let prof = event.profiling();
346        let submitted_at = prof.submit_time.unwrap();
347        let queued_at = prof.queue_time.unwrap();
348        let started_at = prof.start_time.unwrap();
349        let ended_at = prof.end_time.unwrap();
350        assert!(queued_at < submitted_at);
351        assert!(queued_at < started_at);
352        assert!(started_at < ended_at);
353
354        let total = prof.total_duration().unwrap();
355        let max_duration = Duration::from_millis(10);
356        assert!(
357            total < max_duration,
358            "total {:?} was greater than max duration {:?}",
359            total,
360            max_duration
361        );
362
363        let duration_waiting_in_queue = prof.duration_waiting_in_queue().unwrap();
364        let max_duration_waiting_in_queue = Duration::from_millis(5);
365        assert!(
366            duration_waiting_in_queue < max_duration_waiting_in_queue,
367            "duration_waiting_in_queue {:?} was greater than max duration {:?}",
368            duration_waiting_in_queue,
369            max_duration_waiting_in_queue
370        );
371
372        let duration_waiting_for_init = prof.duration_between_submit_and_start().unwrap();
373        let max_duration_waiting_for_init = Duration::from_millis(5);
374        assert!(
375            duration_waiting_for_init < max_duration_waiting_for_init,
376            "duration_waiting_for_init {:?} was greater than max duration {:?}",
377            duration_waiting_for_init,
378            max_duration_waiting_in_queue
379        );
380
381        let duration_of_execution = prof.duration_of_execution().unwrap();
382        let max_duration_of_execution = Duration::from_millis(5);
383        assert!(
384            duration_of_execution < max_duration_of_execution,
385            "time_waiting_for_init {:?} was greater than max duration {:?}",
386            duration_of_execution,
387            max_duration_of_execution
388        );
389    }
390}