Skip to main content

systemconfiguration/
async_api.rs

1#![cfg(feature = "async")]
2
3//! Async event streams for `SystemConfiguration` callbacks.
4//!
5//! Enable this module with `--features async`.
6
7use std::ffi::c_void;
8
9use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
10
11use crate::{
12    bridge::{self, CStringArray},
13    error::Result,
14    ffi, PreferencesNotification, ReachabilityFlags, SystemConfigurationError,
15};
16
17struct SubscriptionHandle {
18    ptr: *mut c_void,
19    sender_ptr: *mut c_void,
20    unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
21    drop_sender_fn: unsafe fn(*mut c_void),
22}
23
24impl SubscriptionHandle {
25    fn new<T>(
26        ptr: *mut c_void,
27        sender_ptr: *mut AsyncStreamSender<T>,
28        unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
29    ) -> Self {
30        Self {
31            ptr,
32            sender_ptr: sender_ptr.cast::<c_void>(),
33            unsubscribe_fn,
34            drop_sender_fn: drop_sender::<T>,
35        }
36    }
37}
38
39impl Drop for SubscriptionHandle {
40    fn drop(&mut self) {
41        if !self.ptr.is_null() {
42            unsafe { (self.unsubscribe_fn)(self.ptr) };
43        }
44        if !self.sender_ptr.is_null() {
45            unsafe { (self.drop_sender_fn)(self.sender_ptr) };
46        }
47    }
48}
49
50// SAFETY: `SubscriptionHandle` owns two raw pointers that are each used
51// exactly once (in `Drop`).  The `ptr` was returned by a Swift subscribe
52// thunk and is only ever passed back into the matching unsubscribe thunk.
53// The `sender_ptr` was created with `Box::into_raw` and is reconstituted
54// exactly once via `drop_sender_fn`.  Neither pointer is shared or aliased
55// elsewhere, so transferring the handle across thread boundaries is safe.
56unsafe impl Send for SubscriptionHandle {}
57// SAFETY: `SubscriptionHandle` carries no shared mutable state — the two
58// pointers are used only in `Drop` (single-caller, single-use).  It is safe
59// to hold a `&SubscriptionHandle` on multiple threads simultaneously because
60// no method takes `&self`.
61unsafe impl Sync for SubscriptionHandle {}
62
63unsafe fn drop_sender<T>(raw: *mut c_void) {
64    if raw.is_null() {
65        return;
66    }
67    unsafe {
68        drop(Box::from_raw(raw.cast::<AsyncStreamSender<T>>()));
69    }
70}
71
72#[derive(Debug, Clone)]
73pub struct DynamicStoreNotificationEvent {
74    pub changed_keys: Vec<String>,
75}
76
77pub struct DynamicStoreNotificationStream {
78    inner: BoundedAsyncStream<DynamicStoreNotificationEvent>,
79    _handle: SubscriptionHandle,
80}
81
82unsafe extern "C" fn dynamic_store_async_cb(kind: i32, payload: *mut c_void, ctx: *mut c_void) {
83    if kind != 0 || ctx.is_null() {
84        return;
85    }
86
87    // SAFETY: `ctx` is the `sender_ptr` created in `subscribe` via
88    // `Box::into_raw`.  It remains valid for the lifetime of the
89    // `SubscriptionHandle` (which outlives every callback invocation because
90    // `unsubscribe` joins the run-loop thread before `Drop` frees the sender).
91    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<DynamicStoreNotificationEvent>>() };
92    let changed_keys = if payload.is_null() {
93        Vec::new()
94    } else {
95        bridge::take_string_array(payload)
96    };
97    doom_fish_utils::panic_safe::catch_user_panic(
98        "dynamic_store_async_cb",
99        || sender.push(DynamicStoreNotificationEvent { changed_keys }),
100    );
101}
102
103impl DynamicStoreNotificationStream {
104    pub fn subscribe(
105        name: &str,
106        keys: &[&str],
107        patterns: &[&str],
108        capacity: usize,
109    ) -> Result<Self> {
110        let c_name = bridge::cstring(name, "sc_dynamic_store_notification_subscribe")?;
111        let c_keys = CStringArray::new(keys, "sc_dynamic_store_notification_subscribe")?;
112        let c_patterns = CStringArray::new(patterns, "sc_dynamic_store_notification_subscribe")?;
113
114        let (stream, sender) = BoundedAsyncStream::new(capacity);
115        let sender_ptr = Box::into_raw(Box::new(sender));
116
117        let handle_ptr = unsafe {
118            ffi::async_api::sc_dynamic_store_notification_subscribe(
119                c_name.as_ptr(),
120                c_keys.as_ptr(),
121                c_keys.count(),
122                c_patterns.as_ptr(),
123                c_patterns.count(),
124                Some(dynamic_store_async_cb),
125                sender_ptr.cast::<c_void>(),
126            )
127        };
128
129        if handle_ptr.is_null() {
130            unsafe {
131                drop(Box::from_raw(sender_ptr));
132            }
133            return Err(SystemConfigurationError::last(
134                "sc_dynamic_store_notification_subscribe",
135            ));
136        }
137
138        Ok(Self {
139            inner: stream,
140            _handle: SubscriptionHandle::new(
141                handle_ptr,
142                sender_ptr,
143                ffi::async_api::sc_dynamic_store_notification_unsubscribe,
144            ),
145        })
146    }
147
148    pub const fn next(&self) -> NextItem<'_, DynamicStoreNotificationEvent> {
149        self.inner.next()
150    }
151
152    pub fn try_next(&self) -> Option<DynamicStoreNotificationEvent> {
153        self.inner.try_next()
154    }
155
156    pub fn buffered_count(&self) -> usize {
157        self.inner.buffered_count()
158    }
159}
160
161#[derive(Debug, Clone, Copy)]
162pub struct ReachabilityEvent {
163    pub flags: ReachabilityFlags,
164}
165
166pub struct ReachabilityStream {
167    inner: BoundedAsyncStream<ReachabilityEvent>,
168    _handle: SubscriptionHandle,
169}
170
171unsafe extern "C" fn reachability_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
172    if ctx.is_null() {
173        return;
174    }
175
176    // SAFETY: same lifetime contract as `dynamic_store_async_cb` — `ctx` is a
177    // `Box::into_raw`-leaked `AsyncStreamSender<ReachabilityEvent>` that lives
178    // until `SubscriptionHandle::Drop` calls `drop_sender_fn`.  Reachability
179    // uses a dispatch queue (not a dedicated thread), so the queue is stopped
180    // before the sender is freed.
181    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ReachabilityEvent>>() };
182    doom_fish_utils::panic_safe::catch_user_panic("reachability_async_cb", || {
183        sender.push(ReachabilityEvent {
184            flags: ReachabilityFlags(u32::from_ne_bytes(kind.to_ne_bytes())),
185        });
186    });
187}
188
189impl ReachabilityStream {
190    pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
191        let c_name = bridge::cstring(name, "sc_reachability_notification_subscribe")?;
192
193        let (stream, sender) = BoundedAsyncStream::new(capacity);
194        let sender_ptr = Box::into_raw(Box::new(sender));
195
196        let handle_ptr = unsafe {
197            ffi::async_api::sc_reachability_notification_subscribe(
198                c_name.as_ptr(),
199                Some(reachability_async_cb),
200                sender_ptr.cast::<c_void>(),
201            )
202        };
203
204        if handle_ptr.is_null() {
205            unsafe {
206                drop(Box::from_raw(sender_ptr));
207            }
208            return Err(SystemConfigurationError::last(
209                "sc_reachability_notification_subscribe",
210            ));
211        }
212
213        Ok(Self {
214            inner: stream,
215            _handle: SubscriptionHandle::new(
216                handle_ptr,
217                sender_ptr,
218                ffi::async_api::sc_reachability_notification_unsubscribe,
219            ),
220        })
221    }
222
223    pub const fn next(&self) -> NextItem<'_, ReachabilityEvent> {
224        self.inner.next()
225    }
226
227    pub fn try_next(&self) -> Option<ReachabilityEvent> {
228        self.inner.try_next()
229    }
230
231    pub fn buffered_count(&self) -> usize {
232        self.inner.buffered_count()
233    }
234}
235
236#[derive(Debug, Clone, Copy)]
237pub struct PreferencesNotificationEvent {
238    pub notification: PreferencesNotification,
239}
240
241pub struct PreferencesNotificationStream {
242    inner: BoundedAsyncStream<PreferencesNotificationEvent>,
243    _handle: SubscriptionHandle,
244}
245
246unsafe extern "C" fn preferences_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
247    if ctx.is_null() {
248        return;
249    }
250
251    // SAFETY: same lifetime contract as `dynamic_store_async_cb` — `ctx` is a
252    // `Box::into_raw`-leaked `AsyncStreamSender<PreferencesNotificationEvent>`
253    // that lives until `SubscriptionHandle::Drop` calls `drop_sender_fn`.
254    let sender = unsafe { &*ctx.cast::<AsyncStreamSender<PreferencesNotificationEvent>>() };
255    doom_fish_utils::panic_safe::catch_user_panic("preferences_async_cb", || {
256        sender.push(PreferencesNotificationEvent {
257            notification: PreferencesNotification::from_raw(u32::from_ne_bytes(
258                kind.to_ne_bytes(),
259            )),
260        });
261    });
262}
263
264impl PreferencesNotificationStream {
265    pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
266        let c_name = bridge::cstring(name, "sc_preferences_notification_subscribe")?;
267
268        let (stream, sender) = BoundedAsyncStream::new(capacity);
269        let sender_ptr = Box::into_raw(Box::new(sender));
270
271        let handle_ptr = unsafe {
272            ffi::async_api::sc_preferences_notification_subscribe(
273                c_name.as_ptr(),
274                Some(preferences_async_cb),
275                sender_ptr.cast::<c_void>(),
276            )
277        };
278
279        if handle_ptr.is_null() {
280            unsafe {
281                drop(Box::from_raw(sender_ptr));
282            }
283            return Err(SystemConfigurationError::last(
284                "sc_preferences_notification_subscribe",
285            ));
286        }
287
288        Ok(Self {
289            inner: stream,
290            _handle: SubscriptionHandle::new(
291                handle_ptr,
292                sender_ptr,
293                ffi::async_api::sc_preferences_notification_unsubscribe,
294            ),
295        })
296    }
297
298    pub const fn next(&self) -> NextItem<'_, PreferencesNotificationEvent> {
299        self.inner.next()
300    }
301
302    pub fn try_next(&self) -> Option<PreferencesNotificationEvent> {
303        self.inner.try_next()
304    }
305
306    pub fn buffered_count(&self) -> usize {
307        self.inner.buffered_count()
308    }
309}