Skip to main content

systemconfiguration/
async_api.rs

1#![cfg(feature = "async")]
2
3//! Async event streams for `SystemConfiguration` callbacks.
4//!
5//! Enable this module with `--features async`.
6
7use std::ffi::c_void;
8
9use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
10
11use crate::{
12    bridge::{self, CStringArray},
13    error::Result,
14    ffi, PreferencesNotification, ReachabilityFlags, SystemConfigurationError,
15};
16
17struct SubscriptionHandle {
18    ptr: *mut c_void,
19    sender_ptr: *mut c_void,
20    unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
21    drop_sender_fn: unsafe fn(*mut c_void),
22}
23
24impl SubscriptionHandle {
25    fn new<T>(
26        ptr: *mut c_void,
27        sender_ptr: *mut AsyncStreamSender<T>,
28        unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
29    ) -> Self {
30        Self {
31            ptr,
32            sender_ptr: sender_ptr.cast::<c_void>(),
33            unsubscribe_fn,
34            drop_sender_fn: drop_sender::<T>,
35        }
36    }
37}
38
39impl Drop for SubscriptionHandle {
40    fn drop(&mut self) {
41        if !self.ptr.is_null() {
42            unsafe { (self.unsubscribe_fn)(self.ptr) };
43        }
44        if !self.sender_ptr.is_null() {
45            unsafe { (self.drop_sender_fn)(self.sender_ptr) };
46        }
47    }
48}
49
50// SAFETY: `SubscriptionHandle` owns two raw pointers that are each used
51// exactly once (in `Drop`).  The `ptr` was returned by a Swift subscribe
52// thunk and is only ever passed back into the matching unsubscribe thunk.
53// The `sender_ptr` was created with `Box::into_raw` and is reconstituted
54// exactly once via `drop_sender_fn`.  Neither pointer is shared or aliased
55// elsewhere, so transferring the handle across thread boundaries is safe.
56unsafe impl Send for SubscriptionHandle {}
57// SAFETY: `SubscriptionHandle` carries no shared mutable state — the two
58// pointers are used only in `Drop` (single-caller, single-use).  It is safe
59// to hold a `&SubscriptionHandle` on multiple threads simultaneously because
60// no method takes `&self`.
61unsafe impl Sync for SubscriptionHandle {}
62
63unsafe fn drop_sender<T>(raw: *mut c_void) {
64    if raw.is_null() {
65        return;
66    }
67    unsafe {
68        drop(Box::from_raw(raw.cast::<AsyncStreamSender<T>>()));
69    }
70}
71
72#[derive(Debug, Clone)]
73/// Wraps a callback payload from `SCDynamicStoreSetNotificationKeys`.
74pub struct DynamicStoreNotificationEvent {
75    /// Wraps the changed keys delivered by `SCDynamicStoreSetNotificationKeys`.
76    pub changed_keys: Vec<String>,
77}
78
79/// Wraps async `SCDynamicStore` notifications.
80pub struct DynamicStoreNotificationStream {
81    inner: BoundedAsyncStream<DynamicStoreNotificationEvent>,
82    _handle: SubscriptionHandle,
83}
84
85unsafe extern "C" fn dynamic_store_async_cb(kind: i32, payload: *mut c_void, ctx: *mut c_void) {
86    if kind != 0 || ctx.is_null() {
87        return;
88    }
89
90    // SAFETY: `ctx` is the `sender_ptr` created in `subscribe` via
91    // `Box::into_raw`.  It remains valid for the lifetime of the
92    // `SubscriptionHandle` (which outlives every callback invocation because
93    // `unsubscribe` joins the run-loop thread before `Drop` frees the sender).
94    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<DynamicStoreNotificationEvent>>() };
95    let changed_keys = if payload.is_null() {
96        Vec::new()
97    } else {
98        bridge::take_string_array(payload)
99    };
100    doom_fish_utils::panic_safe::catch_user_panic("dynamic_store_async_cb", || {
101        sender.push(DynamicStoreNotificationEvent { changed_keys });
102    });
103}
104
105impl DynamicStoreNotificationStream {
106    /// Wraps `SCDynamicStoreNotificationSubscribe`.
107    pub fn subscribe(
108        name: &str,
109        keys: &[&str],
110        patterns: &[&str],
111        capacity: usize,
112    ) -> Result<Self> {
113        let c_name = bridge::cstring(name, "sc_dynamic_store_notification_subscribe")?;
114        let c_keys = CStringArray::new(keys, "sc_dynamic_store_notification_subscribe")?;
115        let c_patterns = CStringArray::new(patterns, "sc_dynamic_store_notification_subscribe")?;
116
117        let (stream, sender) = BoundedAsyncStream::new(capacity);
118        let sender_ptr = Box::into_raw(Box::new(sender));
119
120        let handle_ptr = unsafe {
121            ffi::async_api::sc_dynamic_store_notification_subscribe(
122                c_name.as_ptr(),
123                c_keys.as_ptr(),
124                c_keys.count(),
125                c_patterns.as_ptr(),
126                c_patterns.count(),
127                Some(dynamic_store_async_cb),
128                sender_ptr.cast::<c_void>(),
129            )
130        };
131
132        if handle_ptr.is_null() {
133            unsafe {
134                drop(Box::from_raw(sender_ptr));
135            }
136            return Err(SystemConfigurationError::last(
137                "sc_dynamic_store_notification_subscribe",
138            ));
139        }
140
141        Ok(Self {
142            inner: stream,
143            _handle: SubscriptionHandle::new(
144                handle_ptr,
145                sender_ptr,
146                ffi::async_api::sc_dynamic_store_notification_unsubscribe,
147            ),
148        })
149    }
150
151    /// Wraps the next buffered `SCDynamicStore` notification.
152    pub const fn next(&self) -> NextItem<'_, DynamicStoreNotificationEvent> {
153        self.inner.next()
154    }
155
156    /// Wraps a helper on `SCDynamicStore`.
157    pub fn try_next(&self) -> Option<DynamicStoreNotificationEvent> {
158        self.inner.try_next()
159    }
160
161    /// Wraps a helper on `SCDynamicStore`.
162    pub fn buffered_count(&self) -> usize {
163        self.inner.buffered_count()
164    }
165}
166
167#[derive(Debug, Clone, Copy)]
168/// Wraps a callback payload from `SCNetworkReachabilitySetCallback`.
169pub struct ReachabilityEvent {
170    /// Wraps the reachability flags delivered by `SCNetworkReachabilitySetCallback`.
171    pub flags: ReachabilityFlags,
172}
173
174/// Wraps async `SCNetworkReachability` notifications.
175pub struct ReachabilityStream {
176    inner: BoundedAsyncStream<ReachabilityEvent>,
177    _handle: SubscriptionHandle,
178}
179
180unsafe extern "C" fn reachability_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
181    if ctx.is_null() {
182        return;
183    }
184
185    // SAFETY: same lifetime contract as `dynamic_store_async_cb` — `ctx` is a
186    // `Box::into_raw`-leaked `AsyncStreamSender<ReachabilityEvent>` that lives
187    // until `SubscriptionHandle::Drop` calls `drop_sender_fn`.  Reachability
188    // uses a dispatch queue (not a dedicated thread), so the queue is stopped
189    // before the sender is freed.
190    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ReachabilityEvent>>() };
191    doom_fish_utils::panic_safe::catch_user_panic("reachability_async_cb", || {
192        sender.push(ReachabilityEvent {
193            flags: ReachabilityFlags(u32::from_ne_bytes(kind.to_ne_bytes())),
194        });
195    });
196}
197
198impl ReachabilityStream {
199    /// Wraps `SCReachabilityNotificationSubscribe`.
200    pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
201        let c_name = bridge::cstring(name, "sc_reachability_notification_subscribe")?;
202
203        let (stream, sender) = BoundedAsyncStream::new(capacity);
204        let sender_ptr = Box::into_raw(Box::new(sender));
205
206        let handle_ptr = unsafe {
207            ffi::async_api::sc_reachability_notification_subscribe(
208                c_name.as_ptr(),
209                Some(reachability_async_cb),
210                sender_ptr.cast::<c_void>(),
211            )
212        };
213
214        if handle_ptr.is_null() {
215            unsafe {
216                drop(Box::from_raw(sender_ptr));
217            }
218            return Err(SystemConfigurationError::last(
219                "sc_reachability_notification_subscribe",
220            ));
221        }
222
223        Ok(Self {
224            inner: stream,
225            _handle: SubscriptionHandle::new(
226                handle_ptr,
227                sender_ptr,
228                ffi::async_api::sc_reachability_notification_unsubscribe,
229            ),
230        })
231    }
232
233    /// Wraps the next buffered `SCNetworkReachability` notification.
234    pub const fn next(&self) -> NextItem<'_, ReachabilityEvent> {
235        self.inner.next()
236    }
237
238    /// Wraps a helper on `SCNetworkReachability`.
239    pub fn try_next(&self) -> Option<ReachabilityEvent> {
240        self.inner.try_next()
241    }
242
243    /// Wraps a helper on `SCNetworkReachability`.
244    pub fn buffered_count(&self) -> usize {
245        self.inner.buffered_count()
246    }
247}
248
249#[derive(Debug, Clone, Copy)]
250/// Wraps a callback payload from `SCPreferencesSetCallback`.
251pub struct PreferencesNotificationEvent {
252    /// Wraps the notification flags delivered by `SCPreferencesSetCallback`.
253    pub notification: PreferencesNotification,
254}
255
256/// Wraps async `SCPreferences` notifications.
257pub struct PreferencesNotificationStream {
258    inner: BoundedAsyncStream<PreferencesNotificationEvent>,
259    _handle: SubscriptionHandle,
260}
261
262unsafe extern "C" fn preferences_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
263    if ctx.is_null() {
264        return;
265    }
266
267    // SAFETY: same lifetime contract as `dynamic_store_async_cb` — `ctx` is a
268    // `Box::into_raw`-leaked `AsyncStreamSender<PreferencesNotificationEvent>`
269    // that lives until `SubscriptionHandle::Drop` calls `drop_sender_fn`.
270    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<PreferencesNotificationEvent>>() };
271    doom_fish_utils::panic_safe::catch_user_panic("preferences_async_cb", || {
272        sender.push(PreferencesNotificationEvent {
273            notification: PreferencesNotification::from_raw(u32::from_ne_bytes(kind.to_ne_bytes())),
274        });
275    });
276}
277
278impl PreferencesNotificationStream {
279    /// Wraps `SCPreferencesNotificationSubscribe`.
280    pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
281        let c_name = bridge::cstring(name, "sc_preferences_notification_subscribe")?;
282
283        let (stream, sender) = BoundedAsyncStream::new(capacity);
284        let sender_ptr = Box::into_raw(Box::new(sender));
285
286        let handle_ptr = unsafe {
287            ffi::async_api::sc_preferences_notification_subscribe(
288                c_name.as_ptr(),
289                Some(preferences_async_cb),
290                sender_ptr.cast::<c_void>(),
291            )
292        };
293
294        if handle_ptr.is_null() {
295            unsafe {
296                drop(Box::from_raw(sender_ptr));
297            }
298            return Err(SystemConfigurationError::last(
299                "sc_preferences_notification_subscribe",
300            ));
301        }
302
303        Ok(Self {
304            inner: stream,
305            _handle: SubscriptionHandle::new(
306                handle_ptr,
307                sender_ptr,
308                ffi::async_api::sc_preferences_notification_unsubscribe,
309            ),
310        })
311    }
312
313    /// Wraps the next buffered `SCPreferences` notification.
314    pub const fn next(&self) -> NextItem<'_, PreferencesNotificationEvent> {
315        self.inner.next()
316    }
317
318    /// Wraps a helper on `SCPreferences`.
319    pub fn try_next(&self) -> Option<PreferencesNotificationEvent> {
320        self.inner.try_next()
321    }
322
323    /// Wraps a helper on `SCPreferences`.
324    pub fn buffered_count(&self) -> usize {
325        self.inner.buffered_count()
326    }
327}