Skip to main content

dispatch2/
queue.rs

1use alloc::boxed::Box;
2use alloc::ffi::CString;
3use core::ffi::c_long;
4use core::ptr::NonNull;
5
6use super::utils::function_wrapper;
7use crate::generated::{
8    _dispatch_main_q, _dispatch_queue_attr_concurrent, dispatch_get_global_queue,
9    dispatch_queue_set_specific,
10};
11use crate::{
12    DispatchObject, DispatchQoS, DispatchRetained, DispatchTime, QualityOfServiceClassFloorError,
13};
14
15/// Error returned by [`DispatchQueue::after`].
16#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
17#[non_exhaustive]
18pub enum QueueAfterError {
19    /// The given timeout value will result in an overflow when converting to dispatch time.
20    TimeOverflow,
21}
22
23enum_with_val! {
24    /// Queue priority.
25    #[doc(alias = "dispatch_queue_priority_t")]
26    #[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
27    pub struct DispatchQueueGlobalPriority(pub c_long) {
28        /// High priority.
29        #[doc(alias = "DISPATCH_QUEUE_PRIORITY_HIGH")]
30        High = 0x2,
31        /// Default priority.
32        #[doc(alias = "DISPATCH_QUEUE_PRIORITY_DEFAULT")]
33        Default = 0x0,
34        /// Low priority.
35        #[doc(alias = "DISPATCH_QUEUE_PRIORITY_LOW")]
36        Low = -0x2,
37        /// Background priority.
38        #[doc(alias = "DISPATCH_QUEUE_PRIORITY_BACKGROUND")]
39        Background = u16::MIN as c_long,
40    }
41}
42
43/// Global queue identifier definition for [`DispatchQueue::new`] and [`DispatchQueue::new_with_target`].
44#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
45pub enum GlobalQueueIdentifier {
46    /// Standard priority based queue.
47    Priority(DispatchQueueGlobalPriority),
48    /// Quality of service priority based queue.
49    QualityOfService(DispatchQoS),
50}
51
52impl GlobalQueueIdentifier {
53    /// Convert and consume [GlobalQueueIdentifier] into its raw value.
54    #[inline]
55    pub fn to_identifier(self) -> isize {
56        match self {
57            GlobalQueueIdentifier::Priority(queue_priority) => queue_priority.0 as isize,
58            GlobalQueueIdentifier::QualityOfService(qos_class) => qos_class.0 as isize,
59        }
60    }
61}
62
63dispatch_object!(
64    /// Dispatch queue.
65    #[doc(alias = "dispatch_queue_t")]
66    #[doc(alias = "dispatch_queue_s")]
67    pub struct DispatchQueue;
68);
69
70dispatch_object_not_data!(unsafe DispatchQueue);
71
72impl DispatchQueue {
73    /// Create a new [`DispatchQueue`].
74    #[inline]
75    pub fn new(label: &str, queue_attribute: Option<&DispatchQueueAttr>) -> DispatchRetained<Self> {
76        let label = CString::new(label).expect("Invalid label!");
77
78        // SAFETY: The label is a valid C string.
79        unsafe { Self::__new(label.as_ptr(), queue_attribute) }
80    }
81
82    /// Create a new [`DispatchQueue`] with a given target [`DispatchQueue`].
83    #[inline]
84    pub fn new_with_target(
85        label: &str,
86        queue_attribute: Option<&DispatchQueueAttr>,
87        target: Option<&DispatchQueue>,
88    ) -> DispatchRetained<Self> {
89        let label = CString::new(label).expect("Invalid label!");
90
91        // SAFETY: The label is a valid C string.
92        unsafe { Self::__new_with_target(label.as_ptr(), queue_attribute, target) }
93    }
94
95    /// Return a system-defined global concurrent [`DispatchQueue`] with the priority derived from [GlobalQueueIdentifier].
96    #[inline]
97    pub fn global_queue(identifier: GlobalQueueIdentifier) -> DispatchRetained<Self> {
98        let raw_identifier = identifier.to_identifier();
99
100        // Safety: raw_identifier cannot be invalid, flags is reserved.
101        dispatch_get_global_queue(raw_identifier, 0)
102    }
103
104    /// Return the main queue.
105    // TODO: Mark this as `const` once in MSRV.
106    #[inline]
107    #[doc(alias = "dispatch_get_main_queue")]
108    pub fn main() -> &'static Self {
109        // Inline function in the header
110
111        // SAFETY: The main queue is safe to access from anywhere, and is
112        // valid forever.
113        unsafe { &_dispatch_main_q }
114    }
115
116    /// Submit a function for synchronous execution on the [`DispatchQueue`].
117    #[inline]
118    pub fn exec_sync<F>(&self, work: F)
119    where
120        F: Send + FnOnce(),
121    {
122        let work_boxed = Box::into_raw(Box::new(work)).cast();
123
124        // NOTE: `dispatch_sync*` functions are discouraged on workloops for
125        // performance reasons, but they should still work, so we won't forbid
126        // it here.
127        //
128        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
129        unsafe { Self::exec_sync_f(self, work_boxed, function_wrapper::<F>) }
130    }
131
132    /// Submit a function for asynchronous execution on the [`DispatchQueue`].
133    #[inline]
134    pub fn exec_async<F>(&self, work: F)
135    where
136        // We need `'static` to make sure any referenced values are borrowed for
137        // long enough since `work` will be performed asynchronously.
138        F: Send + FnOnce() + 'static,
139    {
140        let work_boxed = Box::into_raw(Box::new(work)).cast();
141
142        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
143        unsafe { Self::exec_async_f(self, work_boxed, function_wrapper::<F>) }
144    }
145
146    /// Enqueue a function for execution at the specified time on the [`DispatchQueue`].
147    #[inline]
148    pub fn after<F>(&self, when: DispatchTime, work: F) -> Result<(), QueueAfterError>
149    where
150        F: Send + FnOnce(),
151    {
152        let work_boxed = Box::into_raw(Box::new(work)).cast();
153
154        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
155        unsafe { Self::exec_after_f(when, self, work_boxed, function_wrapper::<F>) };
156
157        Ok(())
158    }
159
160    /// Enqueue a barrier function for asynchronous execution on the [`DispatchQueue`] and return immediately.
161    #[inline]
162    pub fn barrier_async<F>(&self, work: F)
163    where
164        // We need `'static` to make sure any referenced values are borrowed for
165        // long enough since `work` will be performed asynchronously.
166        F: Send + FnOnce() + 'static,
167    {
168        let work_boxed = Box::into_raw(Box::new(work)).cast();
169
170        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
171        unsafe { Self::barrier_async_f(self, work_boxed, function_wrapper::<F>) }
172    }
173
174    /// Enqueue a barrier function for synchronous execution on the [`DispatchQueue`] and wait until that function completes.
175    #[inline]
176    pub fn barrier_sync<F>(&self, work: F)
177    where
178        F: Send + FnOnce(),
179    {
180        let work_boxed = Box::into_raw(Box::new(work)).cast();
181
182        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
183        unsafe { Self::barrier_sync_f(self, work_boxed, function_wrapper::<F>) }
184    }
185
186    /// Submit a function for synchronous execution and mark the function as a barrier for subsequent concurrent tasks.
187    #[inline]
188    pub fn barrier_async_and_wait<F>(&self, work: F)
189    where
190        // We need `'static` to make sure any referenced values are borrowed for
191        // long enough since `work` will be performed asynchronously.
192        F: Send + FnOnce() + 'static,
193    {
194        let work_boxed = Box::into_raw(Box::new(work)).cast();
195
196        // Safety: object cannot be null and work is wrapped to avoid ABI incompatibility.
197        unsafe { Self::barrier_async_and_wait_f(self, work_boxed, function_wrapper::<F>) }
198    }
199
200    /// Sets a function at the given key that will be executed at [`DispatchQueue`] destruction.
201    #[inline]
202    pub fn set_specific<F>(&self, key: NonNull<()>, destructor: F)
203    where
204        F: Send + FnOnce(),
205    {
206        let destructor_boxed = Box::into_raw(Box::new(destructor)).cast();
207
208        // SAFETY: object cannot be null and destructor is wrapped to avoid
209        // ABI incompatibility.
210        //
211        // The key is never dereferenced, so passing _any_ pointer here is
212        // safe and allowed.
213        unsafe {
214            dispatch_queue_set_specific(self, key.cast(), destructor_boxed, function_wrapper::<F>)
215        }
216    }
217
218    /// Set the QOS class floor of the [`DispatchQueue`].
219    #[inline]
220    pub fn set_qos_class_floor(
221        &self,
222        qos_class: DispatchQoS,
223        relative_priority: i32,
224    ) -> Result<(), QualityOfServiceClassFloorError> {
225        // SAFETY: We are a queue.
226        unsafe { DispatchObject::set_qos_class_floor(self, qos_class, relative_priority) }
227    }
228
229    #[allow(missing_docs)]
230    #[doc(alias = "DISPATCH_APPLY_AUTO")]
231    pub const APPLY_AUTO: Option<&DispatchQueue> = None;
232
233    #[allow(missing_docs)]
234    #[doc(alias = "DISPATCH_TARGET_QUEUE_DEFAULT")]
235    pub const TARGET_QUEUE_DEFAULT: Option<&DispatchQueue> = None;
236
237    #[allow(missing_docs)]
238    #[doc(alias = "DISPATCH_CURRENT_QUEUE_LABEL")]
239    pub const CURRENT_QUEUE_LABEL: Option<&DispatchQueue> = None;
240}
241
242dispatch_object!(
243    /// Dispatch queue attribute.
244    #[doc(alias = "dispatch_queue_attr_t")]
245    #[doc(alias = "dispatch_queue_attr_s")]
246    pub struct DispatchQueueAttr;
247);
248
249dispatch_object_not_data!(unsafe DispatchQueueAttr);
250
251impl DispatchQueueAttr {
252    /// A dispatch queue that executes blocks serially in FIFO order.
253    #[doc(alias = "DISPATCH_QUEUE_SERIAL")]
254    pub const SERIAL: Option<&Self> = None;
255
256    // TODO(msrv): Expose this once
257    // #[doc(alias = "DISPATCH_QUEUE_CONCURRENT")]
258    // pub static CONCURRENT: Option<&Self> = {
259    //     // Safety: immutable external definition
260    //     unsafe { Some(&_dispatch_queue_attr_concurrent) }
261    // };
262
263    /// A dispatch queue that executes blocks concurrently.
264    #[inline]
265    pub fn concurrent() -> Option<&'static Self> {
266        // SAFETY: Queues are
267        unsafe { Some(&_dispatch_queue_attr_concurrent) }
268    }
269}
270
271/// Executes blocks submitted to the main queue.
272#[inline]
273pub fn dispatch_main() -> ! {
274    extern "C" {
275        // `dispatch_main` is marked DISPATCH_NOTHROW.
276        fn dispatch_main() -> !;
277    }
278
279    // SAFETY: TODO: Must this be run on the main thread? Do we need to take
280    // `MainThreadMarker`?
281    unsafe { dispatch_main() }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn test_create_main_queue() {
290        let _ = DispatchQueue::main();
291    }
292
293    #[test]
294    #[cfg(feature = "std")]
295    fn test_serial_queue() {
296        let queue = DispatchQueue::new("com.github.madsmtm.objc2", DispatchQueueAttr::SERIAL);
297        let (tx, rx) = std::sync::mpsc::channel();
298        queue.exec_async(move || {
299            tx.send(()).unwrap();
300        });
301        rx.recv().unwrap();
302    }
303
304    #[test]
305    #[cfg(feature = "std")]
306    fn test_concurrent_queue() {
307        let queue = DispatchQueue::new("com.github.madsmtm.objc2", DispatchQueueAttr::concurrent());
308        let (tx, rx) = std::sync::mpsc::channel();
309        let cloned_tx = tx.clone();
310        queue.exec_async(move || {
311            tx.send(()).unwrap();
312        });
313        queue.exec_async(move || {
314            cloned_tx.send(()).unwrap();
315        });
316        for _ in 0..2 {
317            rx.recv().unwrap();
318        }
319    }
320
321    #[test]
322    #[cfg(feature = "std")]
323    fn test_global_default_queue() {
324        let queue = DispatchQueue::global_queue(GlobalQueueIdentifier::QualityOfService(
325            DispatchQoS::Default,
326        ));
327        let (tx, rx) = std::sync::mpsc::channel();
328        queue.exec_async(move || {
329            tx.send(()).unwrap();
330        });
331        rx.recv().unwrap();
332    }
333
334    #[test]
335    #[cfg(feature = "std")]
336    fn test_share_queue_across_threads() {
337        let queue = DispatchQueue::new("com.github.madsmtm.objc2", DispatchQueueAttr::SERIAL);
338        let (tx, rx) = std::sync::mpsc::channel();
339        let cloned_tx = tx.clone();
340        let cloned_queue = queue.clone();
341        queue.exec_async(move || {
342            cloned_queue.exec_async(move || {
343                cloned_tx.send(()).unwrap();
344            });
345        });
346        queue.exec_async(move || {
347            tx.send(()).unwrap();
348        });
349        for _ in 0..2 {
350            rx.recv().unwrap();
351        }
352    }
353
354    #[test]
355    #[cfg(feature = "std")]
356    fn test_move_queue_between_threads() {
357        let queue = DispatchQueue::new("com.github.madsmtm.objc2", DispatchQueueAttr::SERIAL);
358        let (tx, rx) = std::sync::mpsc::channel();
359        std::thread::spawn(move || {
360            queue.exec_async(move || {
361                tx.send(()).unwrap();
362            });
363        });
364        rx.recv().unwrap();
365    }
366}