Skip to main content

apple_cf/
dispatch_queue.rs

1//! Dispatch Queue wrapper for custom queue management
2//!
3//! This module provides a safe Rust wrapper around GCD (Grand Central Dispatch) queues
4//! that can be used with `ScreenCaptureKit` streams.
5//!
6//! ## When to Use Custom Queues
7//!
8//! By default, stream output handlers are called on a system-managed queue. Use a custom
9//! queue when you need:
10//!
11//! - **Priority control** - Use `UserInteractive` `QoS` for low-latency UI updates
12//! - **Thread isolation** - Ensure handlers run on a specific queue
13//! - **Performance tuning** - Adjust queue priority based on your app's needs
14//!
15//! ## Example
16//!
17#![allow(clippy::missing_panics_doc)]
18
19//! ```rust,no_run
20//! use apple_cf::dispatch_queue::{dispatch_async_and_wait, DispatchQueue, DispatchQoS};
21//!
22//! // Create a high-priority queue for frame processing
23//! let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
24//! dispatch_async_and_wait(&queue, || {
25//!     // do queue-bound work here
26//! });
27//! ```
28
29use crate::utils::panic_safe;
30use std::ffi::{c_void, CString};
31use std::fmt;
32use std::time::Duration;
33
34/// Quality of Service levels for dispatch queues
35///
36/// These `QoS` levels help the system prioritize work appropriately.
37///
38/// # Examples
39///
40/// ```
41/// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
42///
43/// // High priority for UI-affecting work
44/// let queue = DispatchQueue::new("com.myapp.ui", DispatchQoS::UserInteractive);
45///
46/// // Lower priority for background tasks
47/// let bg_queue = DispatchQueue::new("com.myapp.background", DispatchQoS::Background);
48/// ```
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
50pub enum DispatchQoS {
51    /// Background `QoS` - for maintenance or cleanup tasks
52    Background = 0,
53    /// Utility `QoS` - for tasks that may take some time
54    Utility = 1,
55    /// Default `QoS` - standard priority
56    #[default]
57    Default = 2,
58    /// User Initiated `QoS` - for tasks initiated by the user
59    UserInitiated = 3,
60    /// User Interactive `QoS` - for tasks that affect the UI
61    UserInteractive = 4,
62}
63
64/// A wrapper around GCD `DispatchQueue`
65///
66/// This allows you to provide a custom dispatch queue for stream output handling
67/// instead of using the default queue.
68///
69/// # Example
70///
71/// ```no_run
72/// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
73///
74/// let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
75/// ```
76pub struct DispatchQueue {
77    ptr: *const c_void,
78}
79
80// SAFETY: `dispatch_queue_t` is documented by Apple as safe to use from any
81// thread.  The Rust wrapper only holds an opaque pointer and never performs
82// interior mutation on it outside of the underlying GCD primitives, which are
83// themselves thread-safe.
84unsafe impl Send for DispatchQueue {}
85unsafe impl Sync for DispatchQueue {}
86
87impl DispatchQueue {
88    /// Creates a new dispatch queue with the specified label and `QoS`
89    ///
90    /// # Arguments
91    ///
92    /// * `label` - A string label for the queue (e.g., "com.myapp.capture")
93    /// * `qos` - The quality of service level for the queue
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
99    ///
100    /// let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
101    /// // Use the queue with SCStream's add_output_handler_with_queue
102    /// ```
103    ///
104    /// # Panics
105    ///
106    /// Panics if the label contains null bytes or if queue creation fails
107    #[must_use]
108    pub fn new(label: &str, qos: DispatchQoS) -> Self {
109        let c_label = CString::new(label).expect("Label contains null byte");
110        let ptr = unsafe { crate::ffi::acf_dispatch_queue_create(c_label.as_ptr(), qos as i32) };
111        assert!(!ptr.is_null(), "Failed to create dispatch queue");
112        Self { ptr }
113    }
114
115    /// Returns the raw pointer to the dispatch queue
116    ///
117    /// This is used internally for FFI calls (and for testing)
118    #[must_use]
119    pub const fn as_ptr(&self) -> *const c_void {
120        self.ptr
121    }
122
123    #[must_use]
124    const fn as_mut_ptr(&self) -> *mut c_void {
125        self.ptr.cast_mut()
126    }
127}
128
129impl Clone for DispatchQueue {
130    fn clone(&self) -> Self {
131        unsafe {
132            Self {
133                ptr: crate::ffi::dispatch_queue_retain(self.ptr),
134            }
135        }
136    }
137}
138
139impl Drop for DispatchQueue {
140    fn drop(&mut self) {
141        unsafe {
142            crate::ffi::dispatch_queue_release(self.ptr);
143        }
144    }
145}
146
147impl fmt::Debug for DispatchQueue {
148    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
149        f.debug_struct("DispatchQueue")
150            .field("ptr", &self.ptr)
151            .finish()
152    }
153}
154
155impl fmt::Display for DispatchQueue {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        write!(f, "DispatchQueue")
158    }
159}
160
161fn timeout_ms(timeout: Option<Duration>) -> i64 {
162    timeout.map_or(-1, |duration| {
163        i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
164    })
165}
166
167struct DispatchOnceTask {
168    site: &'static str,
169    work: Option<Box<dyn FnOnce() + Send + 'static>>,
170}
171
172struct DispatchApplyTask {
173    work: Box<dyn Fn(usize) + Send + Sync + 'static>,
174}
175
176extern "C" fn dispatch_once_trampoline(context: *mut c_void) {
177    if context.is_null() {
178        return;
179    }
180    let mut task = unsafe { Box::from_raw(context.cast::<DispatchOnceTask>()) };
181    if let Some(work) = task.work.take() {
182        panic_safe::catch_user_panic(task.site, work);
183    }
184}
185
186extern "C" fn dispatch_apply_trampoline(iteration: usize, context: *mut c_void) {
187    if context.is_null() {
188        return;
189    }
190    let task = unsafe { &*context.cast::<DispatchApplyTask>() };
191    panic_safe::catch_user_panic("dispatch_apply", || (task.work)(iteration));
192}
193
194/// Submit `work` to `queue` and return immediately.
195pub fn dispatch_async<F>(queue: &DispatchQueue, work: F)
196where
197    F: FnOnce() + Send + 'static,
198{
199    let task = Box::new(DispatchOnceTask {
200        site: "dispatch_async",
201        work: Some(Box::new(work)),
202    });
203    unsafe {
204        crate::ffi::acf_dispatch_async_f(
205            queue.as_mut_ptr(),
206            Box::into_raw(task).cast(),
207            dispatch_once_trampoline,
208        );
209    }
210}
211
212/// Submit `work` to `queue` and wait until it finishes.
213pub fn dispatch_async_and_wait<F>(queue: &DispatchQueue, work: F)
214where
215    F: FnOnce() + Send + 'static,
216{
217    let task = Box::new(DispatchOnceTask {
218        site: "dispatch_async_and_wait",
219        work: Some(Box::new(work)),
220    });
221    unsafe {
222        crate::ffi::acf_dispatch_async_and_wait_f(
223            queue.as_mut_ptr(),
224            Box::into_raw(task).cast(),
225            dispatch_once_trampoline,
226        );
227    }
228}
229
230/// Run `work` for every `iteration` on `queue`, waiting for all iterations to finish.
231pub fn dispatch_apply<F>(iterations: usize, queue: &DispatchQueue, work: F)
232where
233    F: Fn(usize) + Send + Sync + 'static,
234{
235    if iterations == 0 {
236        return;
237    }
238    let task = Box::new(DispatchApplyTask {
239        work: Box::new(work),
240    });
241    let raw = Box::into_raw(task);
242    unsafe {
243        crate::ffi::acf_dispatch_apply_f(
244            iterations,
245            queue.as_mut_ptr(),
246            raw.cast(),
247            dispatch_apply_trampoline,
248        );
249        drop(Box::from_raw(raw));
250    }
251}
252
253/// Wrapper around `DispatchGroup`.
254#[derive(PartialEq, Eq, Hash)]
255pub struct DispatchGroup {
256    ptr: *mut c_void,
257}
258
259// SAFETY: `dispatch_group_t` is a thread-safe GCD primitive; it is safe to
260// share across threads and to send between threads.
261unsafe impl Send for DispatchGroup {}
262unsafe impl Sync for DispatchGroup {}
263
264impl DispatchGroup {
265    /// Create a new empty group.
266    #[must_use]
267    pub fn new() -> Self {
268        let ptr = unsafe { crate::ffi::acf_dispatch_group_create() };
269        assert!(!ptr.is_null(), "failed to create DispatchGroup");
270        Self { ptr }
271    }
272
273    /// Enter the group.
274    pub fn enter(&self) {
275        unsafe { crate::ffi::acf_dispatch_group_enter(self.ptr) };
276    }
277
278    /// Leave the group.
279    pub fn leave(&self) {
280        unsafe { crate::ffi::acf_dispatch_group_leave(self.ptr) };
281    }
282
283    /// Wait for the group to finish.
284    #[must_use]
285    pub fn wait(&self, timeout: Option<Duration>) -> bool {
286        unsafe { crate::ffi::acf_dispatch_group_wait(self.ptr, timeout_ms(timeout)) }
287    }
288}
289
290impl Default for DispatchGroup {
291    fn default() -> Self {
292        Self::new()
293    }
294}
295
296impl Clone for DispatchGroup {
297    fn clone(&self) -> Self {
298        Self {
299            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
300        }
301    }
302}
303
304impl Drop for DispatchGroup {
305    fn drop(&mut self) {
306        unsafe { crate::ffi::acf_object_release(self.ptr) };
307    }
308}
309
310impl fmt::Debug for DispatchGroup {
311    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312        f.debug_struct("DispatchGroup")
313            .field("ptr", &self.ptr)
314            .finish()
315    }
316}
317
318/// Wrapper around `DispatchSemaphore`.
319#[derive(PartialEq, Eq, Hash)]
320pub struct DispatchSemaphore {
321    ptr: *mut c_void,
322}
323
324// SAFETY: `dispatch_semaphore_t` is a thread-safe GCD primitive designed
325// explicitly for cross-thread signalling.
326unsafe impl Send for DispatchSemaphore {}
327unsafe impl Sync for DispatchSemaphore {}
328
329impl DispatchSemaphore {
330    /// Create a semaphore with an initial signal count.
331    #[must_use]
332    pub fn new(value: i64) -> Self {
333        let ptr = unsafe { crate::ffi::acf_dispatch_semaphore_create(value) };
334        assert!(!ptr.is_null(), "failed to create DispatchSemaphore");
335        Self { ptr }
336    }
337
338    /// Signal the semaphore.
339    #[must_use]
340    pub fn signal(&self) -> i64 {
341        unsafe { crate::ffi::acf_dispatch_semaphore_signal(self.ptr) }
342    }
343
344    /// Wait for the semaphore.
345    #[must_use]
346    pub fn wait(&self, timeout: Option<Duration>) -> bool {
347        unsafe { crate::ffi::acf_dispatch_semaphore_wait(self.ptr, timeout_ms(timeout)) }
348    }
349}
350
351impl Clone for DispatchSemaphore {
352    fn clone(&self) -> Self {
353        Self {
354            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
355        }
356    }
357}
358
359impl Drop for DispatchSemaphore {
360    fn drop(&mut self) {
361        unsafe { crate::ffi::acf_object_release(self.ptr) };
362    }
363}
364
365impl fmt::Debug for DispatchSemaphore {
366    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
367        f.debug_struct("DispatchSemaphore")
368            .field("ptr", &self.ptr)
369            .finish()
370    }
371}
372
373/// Minimal timer-backed `DispatchSource` wrapper.
374#[derive(PartialEq, Eq, Hash)]
375pub struct DispatchSource {
376    ptr: *mut c_void,
377}
378
379// SAFETY: `dispatch_source_t` is a thread-safe GCD primitive.  The bridge
380// wraps the opaque pointer without interior mutation; GCD ensures safe
381// cross-thread access.
382unsafe impl Send for DispatchSource {}
383unsafe impl Sync for DispatchSource {}
384
385impl DispatchSource {
386    /// Create a repeating timer source.
387    #[must_use]
388    pub fn timer(interval: Duration, leeway: Duration) -> Self {
389        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or(u64::MAX);
390        let leeway_ms = u64::try_from(leeway.as_millis()).unwrap_or(u64::MAX);
391        let ptr = unsafe { crate::ffi::acf_dispatch_source_timer_create(interval_ms, leeway_ms) };
392        assert!(!ptr.is_null(), "failed to create DispatchSource timer");
393        Self { ptr }
394    }
395
396    /// Resume the timer source after creation.
397    pub fn resume(&self) {
398        unsafe { crate::ffi::acf_dispatch_source_timer_resume(self.ptr) };
399    }
400
401    /// Cancel the source.
402    pub fn cancel(&self) {
403        unsafe { crate::ffi::acf_dispatch_source_timer_cancel(self.ptr) };
404    }
405
406    /// Number of timer firings observed by the bridge.
407    #[must_use]
408    pub fn fire_count(&self) -> u64 {
409        unsafe { crate::ffi::acf_dispatch_source_timer_fire_count(self.ptr) }
410    }
411}
412
413impl Clone for DispatchSource {
414    fn clone(&self) -> Self {
415        Self {
416            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
417        }
418    }
419}
420
421impl Drop for DispatchSource {
422    fn drop(&mut self) {
423        unsafe { crate::ffi::acf_object_release(self.ptr) };
424    }
425}
426
427impl fmt::Debug for DispatchSource {
428    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429        f.debug_struct("DispatchSource")
430            .field("ptr", &self.ptr)
431            .field("fire_count", &self.fire_count())
432            .finish()
433    }
434}