opencl_core/
command_queue.rs1use std::fmt;
2use std::mem::ManuallyDrop;
3use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
4
5use crate::{
6 Buffer, ClNumber, CommandQueueOptions, CommandQueueProperties, Context, Device, Kernel, Output,
7 Waitlist, Work, NumberTyped,
8};
9
10use crate::ll::{ClCommandQueue, ClContext, ClDeviceID, CommandQueuePtr, ContextPtr, DevicePtr};
11
12pub trait CommandQueueLock<P>
13where
14 P: CommandQueuePtr,
15{
16 unsafe fn write_lock(&self) -> RwLockWriteGuard<P>;
17 unsafe fn read_lock(&self) -> RwLockReadGuard<P>;
18 unsafe fn rw_lock(&self) -> &RwLock<P>;
19
20 fn address(&self) -> String {
21 unsafe {
22 let read_lock = self.read_lock();
23 let ptr = read_lock.command_queue_ptr();
24 format!("{:?}", ptr)
25 }
26 }
27}
28
29pub struct CommandQueue {
30 _queue: ManuallyDrop<Arc<RwLock<ClCommandQueue>>>,
31 _context: ManuallyDrop<ClContext>,
32 _device: ManuallyDrop<ClDeviceID>,
33 _unconstructable: (),
34}
35
36impl CommandQueueLock<ClCommandQueue> for CommandQueue {
37 unsafe fn read_lock(&self) -> RwLockReadGuard<ClCommandQueue> {
38 (*self._queue).read().unwrap()
39 }
40 unsafe fn write_lock(&self) -> RwLockWriteGuard<ClCommandQueue> {
41 (*self._queue).write().unwrap()
42 }
43 unsafe fn rw_lock(&self) -> &RwLock<ClCommandQueue> {
44 &(*self._queue)
45 }
46}
47
48impl fmt::Debug for CommandQueue {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 write!(f, "CommandQueue{{{:?}}}", self.address())
51 }
52}
53
54impl Drop for CommandQueue {
55 fn drop(&mut self) {
56 unsafe {
57 debug!("cl_command_queue {:?} - CommandQueue::drop", self.address());
58 ManuallyDrop::drop(&mut self._queue);
59 ManuallyDrop::drop(&mut self._context);
60 ManuallyDrop::drop(&mut self._device);
61 }
62 }
63}
64
65impl Clone for CommandQueue {
66 fn clone(&self) -> CommandQueue {
67 CommandQueue {
68 _queue: ManuallyDrop::new((*self._queue).clone()),
69 _context: self._context.clone(),
70 _device: self._device.clone(),
71 _unconstructable: (),
72 }
73 }
74}
75
76unsafe impl Send for CommandQueue {}
77unsafe impl Sync for CommandQueue {}
78
79impl CommandQueue {
80 unsafe fn new(queue: ClCommandQueue, context: &Context, device: &Device) -> CommandQueue {
85 CommandQueue::new_from_low_level(
86 queue,
87 context.low_level_context(),
88 device.low_level_device(),
89 )
90 }
91
92 unsafe fn new_from_low_level(
97 queue: ClCommandQueue,
98 context: &ClContext,
99 device: &ClDeviceID,
100 ) -> CommandQueue {
101 CommandQueue {
102 _queue: ManuallyDrop::new(Arc::new(RwLock::new(queue))),
103 _context: ManuallyDrop::new(context.clone()),
104 _device: ManuallyDrop::new(device.clone()),
105 _unconstructable: (),
106 }
107 }
108
109 pub fn create(
111 context: &Context,
112 device: &Device,
113 opt_props: Option<CommandQueueProperties>,
114 ) -> Output<CommandQueue> {
115 unsafe {
116 let ll_queue = ClCommandQueue::create(
117 context.low_level_context(),
118 device.low_level_device(),
119 opt_props,
120 )?;
121 Ok(CommandQueue::new(ll_queue, context, device))
122 }
123 }
124
125 pub fn create_copy(&self) -> Output<CommandQueue> {
130 unsafe {
131 let props = self.properties()?;
132 let ll_queue = ClCommandQueue::create_from_raw_pointers(
133 (*self._context).context_ptr(),
134 (*self._device).device_ptr(),
135 props.into(),
136 )?;
137 Ok(CommandQueue::new_from_low_level(
138 ll_queue,
139 &self._context,
140 &self._device,
141 ))
142 }
143 }
144
145 pub fn low_level_context(&self) -> ClContext {
147 (*self._context).clone()
148 }
149
150 pub fn low_level_device(&self) -> ClDeviceID {
151 (*self._device).clone()
152 }
153
154 pub fn write_buffer<T: ClNumber>(
157 &self,
158 device_buffer: &Buffer,
159 host_buffer: &[T],
160 opts: Option<CommandQueueOptions>,
161 ) -> Output<()>
162 {
163
164 unsafe {
165 let mut qlock = self.write_lock();
166 let mut buf_lock = device_buffer.write_lock();
167 let event = qlock.write_buffer(&mut *buf_lock, host_buffer, opts.into())?;
168 event.wait()
169 }
170 }
171
172 pub fn read_buffer<T: ClNumber>(
175 &self,
176 device_buffer: &Buffer,
177 host_buffer: &mut [T],
178 opts: Option<CommandQueueOptions>,
179 ) -> Output<Option<Vec<T>>> {
180 unsafe {
181 device_buffer.number_type().type_check(T::number_type())?;
182 let mut qlock = self.write_lock();
183 let buf_lock = device_buffer.read_lock();
184 let mut event = qlock.read_buffer(&*buf_lock, host_buffer, opts)?;
185 event.wait()
186 }
187 }
188
189 pub fn enqueue_kernel(
190 &self,
191 kernel: Kernel,
192 work: &Work,
193 opts: Option<CommandQueueOptions>,
194 ) -> Output<()> {
195 unsafe {
196 let mut kernel_lock = kernel.write_lock();
197 let mut qlock = self.write_lock();
198 let event = qlock.enqueue_kernel(&mut (*kernel_lock), work, opts)?;
199 event.wait()
200 }
201 }
202
203 pub fn finish(&self) -> Output<()> {
204 unsafe {
205 let mut lock = self.write_lock();
206 lock.finish()
207 }
208 }
209
210 pub fn reference_count(&self) -> Output<u32> {
211 unsafe { self.read_lock().reference_count() }
212 }
213
214 pub fn properties(&self) -> Output<CommandQueueProperties> {
215 unsafe { self.read_lock().properties() }
216 }
217}
218
219impl PartialEq for CommandQueue {
220 fn eq(&self, other: &Self) -> bool {
221 unsafe {
222 std::ptr::eq(
223 self.read_lock().command_queue_ptr(),
224 other.read_lock().command_queue_ptr(),
225 )
226 }
227 }
228}
229
230impl Eq for CommandQueue {}
231
232#[cfg(test)]
233mod tests {
234 use crate::ll::{ClContext, ClDeviceID, CommandQueueProperties, CommandQueuePtr};
235 use crate::testing;
236
237 const SRC: &'static str = "
238 __kernel void test(__global int *i) {
239 *i += 1;
240 }";
241
242 #[test]
243 pub fn command_queue_method_context_works() {
244 let session = testing::get_session(SRC);
246 let _context: ClContext = unsafe { session.read_queue().context().unwrap() };
247 }
248
249 #[test]
250 pub fn command_queue_method_device_works() {
251 let session = testing::get_session(SRC);
252 let _device: ClDeviceID = unsafe { session.read_queue().device().unwrap() };
253 }
254
255 #[test]
256 pub fn command_queue_method_reference_count_works() {
257 let session = testing::get_session(SRC);
258 let ref_count: u32 = unsafe { session.read_queue().reference_count() }
259 .expect("CommandQueue method reference_count() failed");
260 assert_eq!(ref_count, 1);
261 }
262
263 #[test]
264 pub fn command_queue_method_properties_works() {
265 let session = testing::get_session(SRC);
266 let props: CommandQueueProperties = unsafe { session.read_queue().properties() }
267 .expect("CommandQueue method properties() failed");
268 let bits = props.bits();
269 let maybe_same_prop = CommandQueueProperties::from_bits(bits);
270 if !maybe_same_prop.is_some() {
271 panic!(
272 "
273 CommandQueue method properties returned \
274 an invalid CommandQueueProperties bitflag {:?}\
275 ",
276 bits
277 );
278 }
279 }
280
281 #[test]
282 pub fn command_queue_copy_new_works() {
283 let session = testing::get_session(SRC);
284 unsafe {
285 let cq2 = session.read_queue().create_copy().unwrap();
286 assert!(cq2.command_queue_ptr() != session.read_queue().command_queue_ptr());
287 }
288 }
289}