1#![cfg(feature = "async")]
2
3use std::ffi::c_void;
8
9use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
10
11use crate::{
12 bridge::{self, CStringArray},
13 error::Result,
14 ffi, PreferencesNotification, ReachabilityFlags, SystemConfigurationError,
15};
16
17struct SubscriptionHandle {
18 ptr: *mut c_void,
19 sender_ptr: *mut c_void,
20 unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
21 drop_sender_fn: unsafe fn(*mut c_void),
22}
23
24impl SubscriptionHandle {
25 fn new<T>(
26 ptr: *mut c_void,
27 sender_ptr: *mut AsyncStreamSender<T>,
28 unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
29 ) -> Self {
30 Self {
31 ptr,
32 sender_ptr: sender_ptr.cast::<c_void>(),
33 unsubscribe_fn,
34 drop_sender_fn: drop_sender::<T>,
35 }
36 }
37}
38
39impl Drop for SubscriptionHandle {
40 fn drop(&mut self) {
41 if !self.ptr.is_null() {
42 unsafe { (self.unsubscribe_fn)(self.ptr) };
43 }
44 if !self.sender_ptr.is_null() {
45 unsafe { (self.drop_sender_fn)(self.sender_ptr) };
46 }
47 }
48}
49
50unsafe impl Send for SubscriptionHandle {}
57unsafe impl Sync for SubscriptionHandle {}
62
63unsafe fn drop_sender<T>(raw: *mut c_void) {
64 if raw.is_null() {
65 return;
66 }
67 unsafe {
68 drop(Box::from_raw(raw.cast::<AsyncStreamSender<T>>()));
69 }
70}
71
72#[derive(Debug, Clone)]
73pub struct DynamicStoreNotificationEvent {
75 pub changed_keys: Vec<String>,
77}
78
79pub struct DynamicStoreNotificationStream {
81 inner: BoundedAsyncStream<DynamicStoreNotificationEvent>,
82 _handle: SubscriptionHandle,
83}
84
85unsafe extern "C" fn dynamic_store_async_cb(kind: i32, payload: *mut c_void, ctx: *mut c_void) {
86 if kind != 0 || ctx.is_null() {
87 return;
88 }
89
90 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<DynamicStoreNotificationEvent>>() };
95 let changed_keys = if payload.is_null() {
96 Vec::new()
97 } else {
98 bridge::take_string_array(payload)
99 };
100 doom_fish_utils::panic_safe::catch_user_panic("dynamic_store_async_cb", || {
101 sender.push(DynamicStoreNotificationEvent { changed_keys });
102 });
103}
104
105impl DynamicStoreNotificationStream {
106 pub fn subscribe(
108 name: &str,
109 keys: &[&str],
110 patterns: &[&str],
111 capacity: usize,
112 ) -> Result<Self> {
113 let c_name = bridge::cstring(name, "sc_dynamic_store_notification_subscribe")?;
114 let c_keys = CStringArray::new(keys, "sc_dynamic_store_notification_subscribe")?;
115 let c_patterns = CStringArray::new(patterns, "sc_dynamic_store_notification_subscribe")?;
116
117 let (stream, sender) = BoundedAsyncStream::new(capacity);
118 let sender_ptr = Box::into_raw(Box::new(sender));
119
120 let handle_ptr = unsafe {
121 ffi::async_api::sc_dynamic_store_notification_subscribe(
122 c_name.as_ptr(),
123 c_keys.as_ptr(),
124 c_keys.count(),
125 c_patterns.as_ptr(),
126 c_patterns.count(),
127 Some(dynamic_store_async_cb),
128 sender_ptr.cast::<c_void>(),
129 )
130 };
131
132 if handle_ptr.is_null() {
133 unsafe {
134 drop(Box::from_raw(sender_ptr));
135 }
136 return Err(SystemConfigurationError::last(
137 "sc_dynamic_store_notification_subscribe",
138 ));
139 }
140
141 Ok(Self {
142 inner: stream,
143 _handle: SubscriptionHandle::new(
144 handle_ptr,
145 sender_ptr,
146 ffi::async_api::sc_dynamic_store_notification_unsubscribe,
147 ),
148 })
149 }
150
151 pub const fn next(&self) -> NextItem<'_, DynamicStoreNotificationEvent> {
153 self.inner.next()
154 }
155
156 pub fn try_next(&self) -> Option<DynamicStoreNotificationEvent> {
158 self.inner.try_next()
159 }
160
161 pub fn buffered_count(&self) -> usize {
163 self.inner.buffered_count()
164 }
165}
166
167#[derive(Debug, Clone, Copy)]
168pub struct ReachabilityEvent {
170 pub flags: ReachabilityFlags,
172}
173
174pub struct ReachabilityStream {
176 inner: BoundedAsyncStream<ReachabilityEvent>,
177 _handle: SubscriptionHandle,
178}
179
180unsafe extern "C" fn reachability_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
181 if ctx.is_null() {
182 return;
183 }
184
185 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ReachabilityEvent>>() };
191 doom_fish_utils::panic_safe::catch_user_panic("reachability_async_cb", || {
192 sender.push(ReachabilityEvent {
193 flags: ReachabilityFlags(u32::from_ne_bytes(kind.to_ne_bytes())),
194 });
195 });
196}
197
198impl ReachabilityStream {
199 pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
201 let c_name = bridge::cstring(name, "sc_reachability_notification_subscribe")?;
202
203 let (stream, sender) = BoundedAsyncStream::new(capacity);
204 let sender_ptr = Box::into_raw(Box::new(sender));
205
206 let handle_ptr = unsafe {
207 ffi::async_api::sc_reachability_notification_subscribe(
208 c_name.as_ptr(),
209 Some(reachability_async_cb),
210 sender_ptr.cast::<c_void>(),
211 )
212 };
213
214 if handle_ptr.is_null() {
215 unsafe {
216 drop(Box::from_raw(sender_ptr));
217 }
218 return Err(SystemConfigurationError::last(
219 "sc_reachability_notification_subscribe",
220 ));
221 }
222
223 Ok(Self {
224 inner: stream,
225 _handle: SubscriptionHandle::new(
226 handle_ptr,
227 sender_ptr,
228 ffi::async_api::sc_reachability_notification_unsubscribe,
229 ),
230 })
231 }
232
233 pub const fn next(&self) -> NextItem<'_, ReachabilityEvent> {
235 self.inner.next()
236 }
237
238 pub fn try_next(&self) -> Option<ReachabilityEvent> {
240 self.inner.try_next()
241 }
242
243 pub fn buffered_count(&self) -> usize {
245 self.inner.buffered_count()
246 }
247}
248
249#[derive(Debug, Clone, Copy)]
250pub struct PreferencesNotificationEvent {
252 pub notification: PreferencesNotification,
254}
255
256pub struct PreferencesNotificationStream {
258 inner: BoundedAsyncStream<PreferencesNotificationEvent>,
259 _handle: SubscriptionHandle,
260}
261
262unsafe extern "C" fn preferences_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
263 if ctx.is_null() {
264 return;
265 }
266
267 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<PreferencesNotificationEvent>>() };
271 doom_fish_utils::panic_safe::catch_user_panic("preferences_async_cb", || {
272 sender.push(PreferencesNotificationEvent {
273 notification: PreferencesNotification::from_raw(u32::from_ne_bytes(kind.to_ne_bytes())),
274 });
275 });
276}
277
278impl PreferencesNotificationStream {
279 pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
281 let c_name = bridge::cstring(name, "sc_preferences_notification_subscribe")?;
282
283 let (stream, sender) = BoundedAsyncStream::new(capacity);
284 let sender_ptr = Box::into_raw(Box::new(sender));
285
286 let handle_ptr = unsafe {
287 ffi::async_api::sc_preferences_notification_subscribe(
288 c_name.as_ptr(),
289 Some(preferences_async_cb),
290 sender_ptr.cast::<c_void>(),
291 )
292 };
293
294 if handle_ptr.is_null() {
295 unsafe {
296 drop(Box::from_raw(sender_ptr));
297 }
298 return Err(SystemConfigurationError::last(
299 "sc_preferences_notification_subscribe",
300 ));
301 }
302
303 Ok(Self {
304 inner: stream,
305 _handle: SubscriptionHandle::new(
306 handle_ptr,
307 sender_ptr,
308 ffi::async_api::sc_preferences_notification_unsubscribe,
309 ),
310 })
311 }
312
313 pub const fn next(&self) -> NextItem<'_, PreferencesNotificationEvent> {
315 self.inner.next()
316 }
317
318 pub fn try_next(&self) -> Option<PreferencesNotificationEvent> {
320 self.inner.try_next()
321 }
322
323 pub fn buffered_count(&self) -> usize {
325 self.inner.buffered_count()
326 }
327}