opencl_core/
command_queue.rs

1use std::fmt;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
4
5use crate::{
6    Buffer, ClNumber, CommandQueueOptions, CommandQueueProperties, Context, Device, Kernel, Output,
7    Waitlist, Work, NumberTyped,
8};
9
10use crate::ll::{ClCommandQueue, ClContext, ClDeviceID, CommandQueuePtr, ContextPtr, DevicePtr};
11
12pub trait CommandQueueLock<P>
13where
14    P: CommandQueuePtr,
15{
16    unsafe fn write_lock(&self) -> RwLockWriteGuard<P>;
17    unsafe fn read_lock(&self) -> RwLockReadGuard<P>;
18    unsafe fn rw_lock(&self) -> &RwLock<P>;
19
20    fn address(&self) -> String {
21        unsafe {
22            let read_lock = self.read_lock();
23            let ptr = read_lock.command_queue_ptr();
24            format!("{:?}", ptr)
25        }
26    }
27}
28
29pub struct CommandQueue {
30    _queue: ManuallyDrop<Arc<RwLock<ClCommandQueue>>>,
31    _context: ManuallyDrop<ClContext>,
32    _device: ManuallyDrop<ClDeviceID>,
33    _unconstructable: (),
34}
35
36impl CommandQueueLock<ClCommandQueue> for CommandQueue {
37    unsafe fn read_lock(&self) -> RwLockReadGuard<ClCommandQueue> {
38        (*self._queue).read().unwrap()
39    }
40    unsafe fn write_lock(&self) -> RwLockWriteGuard<ClCommandQueue> {
41        (*self._queue).write().unwrap()
42    }
43    unsafe fn rw_lock(&self) -> &RwLock<ClCommandQueue> {
44        &(*self._queue)
45    }
46}
47
48impl fmt::Debug for CommandQueue {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        write!(f, "CommandQueue{{{:?}}}", self.address())
51    }
52}
53
54impl Drop for CommandQueue {
55    fn drop(&mut self) {
56        unsafe {
57            debug!("cl_command_queue {:?} - CommandQueue::drop", self.address());
58            ManuallyDrop::drop(&mut self._queue);
59            ManuallyDrop::drop(&mut self._context);
60            ManuallyDrop::drop(&mut self._device);
61        }
62    }
63}
64
65impl Clone for CommandQueue {
66    fn clone(&self) -> CommandQueue {
67        CommandQueue {
68            _queue: ManuallyDrop::new((*self._queue).clone()),
69            _context: self._context.clone(),
70            _device: self._device.clone(),
71            _unconstructable: (),
72        }
73    }
74}
75
76unsafe impl Send for CommandQueue {}
77unsafe impl Sync for CommandQueue {}
78
79impl CommandQueue {
80    /// Builds a CommandQueue from a low-level ClCommandQueue, a Context and a Device.
81    ///
82    /// # Safety
83    /// Building a CommandQueue with any invalid ClObject, or mismatched ClObjects is undefined behavior.
84    unsafe fn new(queue: ClCommandQueue, context: &Context, device: &Device) -> CommandQueue {
85        CommandQueue::new_from_low_level(
86            queue,
87            context.low_level_context(),
88            device.low_level_device(),
89        )
90    }
91
92    /// Builds a CommandQueue from a low-level ClObjects
93    ///
94    /// # Safety
95    /// Building a CommandQueue with any invalid ClObject, or mismatched ClObjects is undefined behavior.
96    unsafe fn new_from_low_level(
97        queue: ClCommandQueue,
98        context: &ClContext,
99        device: &ClDeviceID,
100    ) -> CommandQueue {
101        CommandQueue {
102            _queue: ManuallyDrop::new(Arc::new(RwLock::new(queue))),
103            _context: ManuallyDrop::new(context.clone()),
104            _device: ManuallyDrop::new(device.clone()),
105            _unconstructable: (),
106        }
107    }
108
109    /// Creates a new CommandQueue with the given Context on the given Device.
110    pub fn create(
111        context: &Context,
112        device: &Device,
113        opt_props: Option<CommandQueueProperties>,
114    ) -> Output<CommandQueue> {
115        unsafe {
116            let ll_queue = ClCommandQueue::create(
117                context.low_level_context(),
118                device.low_level_device(),
119                opt_props,
120            )?;
121            Ok(CommandQueue::new(ll_queue, context, device))
122        }
123    }
124
125    /// Creates a new copy of a CommandQueue with CommandQueue's Context on the CommandQueue's Device.
126    ///
127    /// This function is useful for executing concurrent operations on a device within the same
128    /// Context.
129    pub fn create_copy(&self) -> Output<CommandQueue> {
130        unsafe {
131            let props = self.properties()?;
132            let ll_queue = ClCommandQueue::create_from_raw_pointers(
133                (*self._context).context_ptr(),
134                (*self._device).device_ptr(),
135                props.into(),
136            )?;
137            Ok(CommandQueue::new_from_low_level(
138                ll_queue,
139                &self._context,
140                &self._device,
141            ))
142        }
143    }
144
145    /// The low-level context of the CommandQueue
146    pub fn low_level_context(&self) -> ClContext {
147        (*self._context).clone()
148    }
149
150    pub fn low_level_device(&self) -> ClDeviceID {
151        (*self._device).clone()
152    }
153
154    /// write_buffer is used to move data from the host buffer (buffer: &[T]) to
155    /// the OpenCL cl_mem pointer inside `d_mem: &Buffer<T>`.
156    pub fn write_buffer<T: ClNumber>(
157        &self,
158        device_buffer: &Buffer,
159        host_buffer: &[T],
160        opts: Option<CommandQueueOptions>,
161    ) -> Output<()>
162    {   
163
164        unsafe {
165            let mut qlock = self.write_lock();
166            let mut buf_lock = device_buffer.write_lock();
167            let event = qlock.write_buffer(&mut *buf_lock, host_buffer, opts.into())?;
168            event.wait()
169        }
170    }
171
172    /// read_buffer is used to move data from the `device_mem` (`cl_mem` pointer
173    /// inside `&DeviceMem<T>`) into a `host_buffer` (`&mut [T]`).
174    pub fn read_buffer<T: ClNumber>(
175        &self,
176        device_buffer: &Buffer,
177        host_buffer: &mut [T],
178        opts: Option<CommandQueueOptions>,
179    ) -> Output<Option<Vec<T>>> {
180        unsafe {
181            device_buffer.number_type().type_check(T::number_type())?;
182            let mut qlock = self.write_lock();
183            let buf_lock = device_buffer.read_lock();
184            let mut event = qlock.read_buffer(&*buf_lock, host_buffer, opts)?;
185            event.wait()
186        }
187    }
188
189    pub fn enqueue_kernel(
190        &self,
191        kernel: Kernel,
192        work: &Work,
193        opts: Option<CommandQueueOptions>,
194    ) -> Output<()> {
195        unsafe {
196            let mut kernel_lock = kernel.write_lock();
197            let mut qlock = self.write_lock();
198            let event = qlock.enqueue_kernel(&mut (*kernel_lock), work, opts)?;
199            event.wait()
200        }
201    }
202
203    pub fn finish(&self) -> Output<()> {
204        unsafe {
205            let mut lock = self.write_lock();
206            lock.finish()
207        }
208    }
209
210    pub fn reference_count(&self) -> Output<u32> {
211        unsafe { self.read_lock().reference_count() }
212    }
213
214    pub fn properties(&self) -> Output<CommandQueueProperties> {
215        unsafe { self.read_lock().properties() }
216    }
217}
218
219impl PartialEq for CommandQueue {
220    fn eq(&self, other: &Self) -> bool {
221        unsafe {
222            std::ptr::eq(
223                self.read_lock().command_queue_ptr(),
224                other.read_lock().command_queue_ptr(),
225            )
226        }
227    }
228}
229
230impl Eq for CommandQueue {}
231
232#[cfg(test)]
233mod tests {
234    use crate::ll::{ClContext, ClDeviceID, CommandQueueProperties, CommandQueuePtr};
235    use crate::testing;
236
237    const SRC: &'static str = "
238    __kernel void test(__global int *i) {
239        *i += 1;
240    }";
241
242    #[test]
243    pub fn command_queue_method_context_works() {
244        // testing::init_logger();
245        let session = testing::get_session(SRC);
246        let _context: ClContext = unsafe { session.read_queue().context().unwrap() };
247    }
248
249    #[test]
250    pub fn command_queue_method_device_works() {
251        let session = testing::get_session(SRC);
252        let _device: ClDeviceID = unsafe { session.read_queue().device().unwrap() };
253    }
254
255    #[test]
256    pub fn command_queue_method_reference_count_works() {
257        let session = testing::get_session(SRC);
258        let ref_count: u32 = unsafe { session.read_queue().reference_count() }
259            .expect("CommandQueue method reference_count() failed");
260        assert_eq!(ref_count, 1);
261    }
262
263    #[test]
264    pub fn command_queue_method_properties_works() {
265        let session = testing::get_session(SRC);
266        let props: CommandQueueProperties = unsafe { session.read_queue().properties() }
267            .expect("CommandQueue method properties() failed");
268        let bits = props.bits();
269        let maybe_same_prop = CommandQueueProperties::from_bits(bits);
270        if !maybe_same_prop.is_some() {
271            panic!(
272                "
273                CommandQueue method properties returned \
274                an invalid CommandQueueProperties bitflag {:?}\
275                ",
276                bits
277            );
278        }
279    }
280
281    #[test]
282    pub fn command_queue_copy_new_works() {
283        let session = testing::get_session(SRC);
284        unsafe {
285            let cq2 = session.read_queue().create_copy().unwrap();
286            assert!(cq2.command_queue_ptr() != session.read_queue().command_queue_ptr());
287        }
288    }
289}