Skip to main content

apple_cf/
dispatch_queue.rs

1//! Dispatch Queue wrapper for custom queue management
2//!
3//! This module provides a safe Rust wrapper around GCD (Grand Central Dispatch) queues
4//! that can be used with `ScreenCaptureKit` streams.
5//!
6//! ## When to Use Custom Queues
7//!
8//! By default, stream output handlers are called on a system-managed queue. Use a custom
9//! queue when you need:
10//!
11//! - **Priority control** - Use `UserInteractive` `QoS` for low-latency UI updates
12//! - **Thread isolation** - Ensure handlers run on a specific queue
13//! - **Performance tuning** - Adjust queue priority based on your app's needs
14//!
15//! ## Example
16//!
17#![allow(clippy::missing_panics_doc)]
18
19//! ```rust,no_run
20//! use apple_cf::dispatch_queue::{dispatch_async_and_wait, DispatchQueue, DispatchQoS};
21//!
22//! // Create a high-priority queue for frame processing
23//! let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
24//! dispatch_async_and_wait(&queue, || {
25//!     // do queue-bound work here
26//! });
27//! ```
28
29use crate::utils::panic_safe;
30use std::ffi::{c_void, CString};
31use std::fmt;
32use std::time::Duration;
33
34/// Quality of Service levels for dispatch queues
35///
36/// These `QoS` levels help the system prioritize work appropriately.
37///
38/// # Examples
39///
40/// ```
41/// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
42///
43/// // High priority for UI-affecting work
44/// let queue = DispatchQueue::new("com.myapp.ui", DispatchQoS::UserInteractive);
45///
46/// // Lower priority for background tasks
47/// let bg_queue = DispatchQueue::new("com.myapp.background", DispatchQoS::Background);
48/// ```
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
50pub enum DispatchQoS {
51    /// Background `QoS` - for maintenance or cleanup tasks
52    Background = 0,
53    /// Utility `QoS` - for tasks that may take some time
54    Utility = 1,
55    /// Default `QoS` - standard priority
56    #[default]
57    Default = 2,
58    /// User Initiated `QoS` - for tasks initiated by the user
59    UserInitiated = 3,
60    /// User Interactive `QoS` - for tasks that affect the UI
61    UserInteractive = 4,
62}
63
64/// A wrapper around GCD `DispatchQueue`
65///
66/// This allows you to provide a custom dispatch queue for stream output handling
67/// instead of using the default queue.
68///
69/// # Example
70///
71/// ```no_run
72/// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
73///
74/// let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
75/// ```
76pub struct DispatchQueue {
77    ptr: *const c_void,
78}
79
80unsafe impl Send for DispatchQueue {}
81unsafe impl Sync for DispatchQueue {}
82
83impl DispatchQueue {
84    /// Creates a new dispatch queue with the specified label and `QoS`
85    ///
86    /// # Arguments
87    ///
88    /// * `label` - A string label for the queue (e.g., "com.myapp.capture")
89    /// * `qos` - The quality of service level for the queue
90    ///
91    /// # Examples
92    ///
93    /// ```
94    /// use apple_cf::dispatch_queue::{DispatchQueue, DispatchQoS};
95    ///
96    /// let queue = DispatchQueue::new("com.myapp.capture", DispatchQoS::UserInteractive);
97    /// // Use the queue with SCStream's add_output_handler_with_queue
98    /// ```
99    ///
100    /// # Panics
101    ///
102    /// Panics if the label contains null bytes or if queue creation fails
103    #[must_use]
104    pub fn new(label: &str, qos: DispatchQoS) -> Self {
105        let c_label = CString::new(label).expect("Label contains null byte");
106        let ptr = unsafe { crate::ffi::dispatch_queue_create(c_label.as_ptr(), qos as i32) };
107        assert!(!ptr.is_null(), "Failed to create dispatch queue");
108        Self { ptr }
109    }
110
111    /// Returns the raw pointer to the dispatch queue
112    ///
113    /// This is used internally for FFI calls (and for testing)
114    #[must_use]
115    pub const fn as_ptr(&self) -> *const c_void {
116        self.ptr
117    }
118
119    #[must_use]
120    const fn as_mut_ptr(&self) -> *mut c_void {
121        self.ptr.cast_mut()
122    }
123}
124
125impl Clone for DispatchQueue {
126    fn clone(&self) -> Self {
127        unsafe {
128            Self {
129                ptr: crate::ffi::dispatch_queue_retain(self.ptr),
130            }
131        }
132    }
133}
134
135impl Drop for DispatchQueue {
136    fn drop(&mut self) {
137        unsafe {
138            crate::ffi::dispatch_queue_release(self.ptr);
139        }
140    }
141}
142
143impl fmt::Debug for DispatchQueue {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.debug_struct("DispatchQueue")
146            .field("ptr", &self.ptr)
147            .finish()
148    }
149}
150
151impl fmt::Display for DispatchQueue {
152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
153        write!(f, "DispatchQueue")
154    }
155}
156
157fn timeout_ms(timeout: Option<Duration>) -> i64 {
158    timeout.map_or(-1, |duration| {
159        i64::try_from(duration.as_millis()).unwrap_or(i64::MAX)
160    })
161}
162
163struct DispatchOnceTask {
164    site: &'static str,
165    work: Option<Box<dyn FnOnce() + Send + 'static>>,
166}
167
168struct DispatchApplyTask {
169    work: Box<dyn Fn(usize) + Send + Sync + 'static>,
170}
171
172extern "C" fn dispatch_once_trampoline(context: *mut c_void) {
173    if context.is_null() {
174        return;
175    }
176    let mut task = unsafe { Box::from_raw(context.cast::<DispatchOnceTask>()) };
177    if let Some(work) = task.work.take() {
178        panic_safe::catch_user_panic(task.site, work);
179    }
180}
181
182extern "C" fn dispatch_apply_trampoline(iteration: usize, context: *mut c_void) {
183    if context.is_null() {
184        return;
185    }
186    let task = unsafe { &*context.cast::<DispatchApplyTask>() };
187    panic_safe::catch_user_panic("dispatch_apply", || (task.work)(iteration));
188}
189
190/// Submit `work` to `queue` and return immediately.
191pub fn dispatch_async<F>(queue: &DispatchQueue, work: F)
192where
193    F: FnOnce() + Send + 'static,
194{
195    let task = Box::new(DispatchOnceTask {
196        site: "dispatch_async",
197        work: Some(Box::new(work)),
198    });
199    unsafe {
200        crate::ffi::acf_dispatch_async_f(
201            queue.as_mut_ptr(),
202            Box::into_raw(task).cast(),
203            dispatch_once_trampoline,
204        );
205    }
206}
207
208/// Submit `work` to `queue` and wait until it finishes.
209pub fn dispatch_async_and_wait<F>(queue: &DispatchQueue, work: F)
210where
211    F: FnOnce() + Send + 'static,
212{
213    let task = Box::new(DispatchOnceTask {
214        site: "dispatch_async_and_wait",
215        work: Some(Box::new(work)),
216    });
217    unsafe {
218        crate::ffi::acf_dispatch_async_and_wait_f(
219            queue.as_mut_ptr(),
220            Box::into_raw(task).cast(),
221            dispatch_once_trampoline,
222        );
223    }
224}
225
226/// Run `work` for every `iteration` on `queue`, waiting for all iterations to finish.
227pub fn dispatch_apply<F>(iterations: usize, queue: &DispatchQueue, work: F)
228where
229    F: Fn(usize) + Send + Sync + 'static,
230{
231    if iterations == 0 {
232        return;
233    }
234    let task = Box::new(DispatchApplyTask {
235        work: Box::new(work),
236    });
237    let raw = Box::into_raw(task);
238    unsafe {
239        crate::ffi::acf_dispatch_apply_f(
240            iterations,
241            queue.as_mut_ptr(),
242            raw.cast(),
243            dispatch_apply_trampoline,
244        );
245        drop(Box::from_raw(raw));
246    }
247}
248
249/// Wrapper around `DispatchGroup`.
250#[derive(PartialEq, Eq, Hash)]
251pub struct DispatchGroup {
252    ptr: *mut c_void,
253}
254
255unsafe impl Send for DispatchGroup {}
256unsafe impl Sync for DispatchGroup {}
257
258impl DispatchGroup {
259    /// Create a new empty group.
260    #[must_use]
261    pub fn new() -> Self {
262        let ptr = unsafe { crate::ffi::acf_dispatch_group_create() };
263        assert!(!ptr.is_null(), "failed to create DispatchGroup");
264        Self { ptr }
265    }
266
267    /// Enter the group.
268    pub fn enter(&self) {
269        unsafe { crate::ffi::acf_dispatch_group_enter(self.ptr) };
270    }
271
272    /// Leave the group.
273    pub fn leave(&self) {
274        unsafe { crate::ffi::acf_dispatch_group_leave(self.ptr) };
275    }
276
277    /// Wait for the group to finish.
278    #[must_use]
279    pub fn wait(&self, timeout: Option<Duration>) -> bool {
280        unsafe { crate::ffi::acf_dispatch_group_wait(self.ptr, timeout_ms(timeout)) }
281    }
282}
283
284impl Default for DispatchGroup {
285    fn default() -> Self {
286        Self::new()
287    }
288}
289
290impl Clone for DispatchGroup {
291    fn clone(&self) -> Self {
292        Self {
293            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
294        }
295    }
296}
297
298impl Drop for DispatchGroup {
299    fn drop(&mut self) {
300        unsafe { crate::ffi::acf_object_release(self.ptr) };
301    }
302}
303
304impl fmt::Debug for DispatchGroup {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        f.debug_struct("DispatchGroup")
307            .field("ptr", &self.ptr)
308            .finish()
309    }
310}
311
312/// Wrapper around `DispatchSemaphore`.
313#[derive(PartialEq, Eq, Hash)]
314pub struct DispatchSemaphore {
315    ptr: *mut c_void,
316}
317
318unsafe impl Send for DispatchSemaphore {}
319unsafe impl Sync for DispatchSemaphore {}
320
321impl DispatchSemaphore {
322    /// Create a semaphore with an initial signal count.
323    #[must_use]
324    pub fn new(value: i64) -> Self {
325        let ptr = unsafe { crate::ffi::acf_dispatch_semaphore_create(value) };
326        assert!(!ptr.is_null(), "failed to create DispatchSemaphore");
327        Self { ptr }
328    }
329
330    /// Signal the semaphore.
331    #[must_use]
332    pub fn signal(&self) -> i64 {
333        unsafe { crate::ffi::acf_dispatch_semaphore_signal(self.ptr) }
334    }
335
336    /// Wait for the semaphore.
337    #[must_use]
338    pub fn wait(&self, timeout: Option<Duration>) -> bool {
339        unsafe { crate::ffi::acf_dispatch_semaphore_wait(self.ptr, timeout_ms(timeout)) }
340    }
341}
342
343impl Clone for DispatchSemaphore {
344    fn clone(&self) -> Self {
345        Self {
346            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
347        }
348    }
349}
350
351impl Drop for DispatchSemaphore {
352    fn drop(&mut self) {
353        unsafe { crate::ffi::acf_object_release(self.ptr) };
354    }
355}
356
357impl fmt::Debug for DispatchSemaphore {
358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359        f.debug_struct("DispatchSemaphore")
360            .field("ptr", &self.ptr)
361            .finish()
362    }
363}
364
365/// Minimal timer-backed `DispatchSource` wrapper.
366#[derive(PartialEq, Eq, Hash)]
367pub struct DispatchSource {
368    ptr: *mut c_void,
369}
370
371unsafe impl Send for DispatchSource {}
372unsafe impl Sync for DispatchSource {}
373
374impl DispatchSource {
375    /// Create a repeating timer source.
376    #[must_use]
377    pub fn timer(interval: Duration, leeway: Duration) -> Self {
378        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or(u64::MAX);
379        let leeway_ms = u64::try_from(leeway.as_millis()).unwrap_or(u64::MAX);
380        let ptr = unsafe { crate::ffi::acf_dispatch_source_timer_create(interval_ms, leeway_ms) };
381        assert!(!ptr.is_null(), "failed to create DispatchSource timer");
382        Self { ptr }
383    }
384
385    /// Resume the timer source after creation.
386    pub fn resume(&self) {
387        unsafe { crate::ffi::acf_dispatch_source_timer_resume(self.ptr) };
388    }
389
390    /// Cancel the source.
391    pub fn cancel(&self) {
392        unsafe { crate::ffi::acf_dispatch_source_timer_cancel(self.ptr) };
393    }
394
395    /// Number of timer firings observed by the bridge.
396    #[must_use]
397    pub fn fire_count(&self) -> u64 {
398        unsafe { crate::ffi::acf_dispatch_source_timer_fire_count(self.ptr) }
399    }
400}
401
402impl Clone for DispatchSource {
403    fn clone(&self) -> Self {
404        Self {
405            ptr: unsafe { crate::ffi::acf_object_retain(self.ptr) },
406        }
407    }
408}
409
410impl Drop for DispatchSource {
411    fn drop(&mut self) {
412        unsafe { crate::ffi::acf_object_release(self.ptr) };
413    }
414}
415
416impl fmt::Debug for DispatchSource {
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        f.debug_struct("DispatchSource")
419            .field("ptr", &self.ptr)
420            .field("fire_count", &self.fire_count())
421            .finish()
422    }
423}