1#![cfg(feature = "async")]
2
3use std::ffi::c_void;
8
9use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
10
11use crate::{
12 bridge::{self, CStringArray},
13 error::Result,
14 ffi, PreferencesNotification, ReachabilityFlags, SystemConfigurationError,
15};
16
17struct SubscriptionHandle {
18 ptr: *mut c_void,
19 sender_ptr: *mut c_void,
20 unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
21 drop_sender_fn: unsafe fn(*mut c_void),
22}
23
24impl SubscriptionHandle {
25 fn new<T>(
26 ptr: *mut c_void,
27 sender_ptr: *mut AsyncStreamSender<T>,
28 unsubscribe_fn: unsafe extern "C" fn(*mut c_void),
29 ) -> Self {
30 Self {
31 ptr,
32 sender_ptr: sender_ptr.cast::<c_void>(),
33 unsubscribe_fn,
34 drop_sender_fn: drop_sender::<T>,
35 }
36 }
37}
38
39impl Drop for SubscriptionHandle {
40 fn drop(&mut self) {
41 if !self.ptr.is_null() {
42 unsafe { (self.unsubscribe_fn)(self.ptr) };
43 }
44 if !self.sender_ptr.is_null() {
45 unsafe { (self.drop_sender_fn)(self.sender_ptr) };
46 }
47 }
48}
49
50unsafe impl Send for SubscriptionHandle {}
57unsafe impl Sync for SubscriptionHandle {}
62
63unsafe fn drop_sender<T>(raw: *mut c_void) {
64 if raw.is_null() {
65 return;
66 }
67 unsafe {
68 drop(Box::from_raw(raw.cast::<AsyncStreamSender<T>>()));
69 }
70}
71
72#[derive(Debug, Clone)]
73pub struct DynamicStoreNotificationEvent {
74 pub changed_keys: Vec<String>,
75}
76
77pub struct DynamicStoreNotificationStream {
78 inner: BoundedAsyncStream<DynamicStoreNotificationEvent>,
79 _handle: SubscriptionHandle,
80}
81
82unsafe extern "C" fn dynamic_store_async_cb(kind: i32, payload: *mut c_void, ctx: *mut c_void) {
83 if kind != 0 || ctx.is_null() {
84 return;
85 }
86
87 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<DynamicStoreNotificationEvent>>() };
92 let changed_keys = if payload.is_null() {
93 Vec::new()
94 } else {
95 bridge::take_string_array(payload)
96 };
97 doom_fish_utils::panic_safe::catch_user_panic(
98 "dynamic_store_async_cb",
99 || sender.push(DynamicStoreNotificationEvent { changed_keys }),
100 );
101}
102
103impl DynamicStoreNotificationStream {
104 pub fn subscribe(
105 name: &str,
106 keys: &[&str],
107 patterns: &[&str],
108 capacity: usize,
109 ) -> Result<Self> {
110 let c_name = bridge::cstring(name, "sc_dynamic_store_notification_subscribe")?;
111 let c_keys = CStringArray::new(keys, "sc_dynamic_store_notification_subscribe")?;
112 let c_patterns = CStringArray::new(patterns, "sc_dynamic_store_notification_subscribe")?;
113
114 let (stream, sender) = BoundedAsyncStream::new(capacity);
115 let sender_ptr = Box::into_raw(Box::new(sender));
116
117 let handle_ptr = unsafe {
118 ffi::async_api::sc_dynamic_store_notification_subscribe(
119 c_name.as_ptr(),
120 c_keys.as_ptr(),
121 c_keys.count(),
122 c_patterns.as_ptr(),
123 c_patterns.count(),
124 Some(dynamic_store_async_cb),
125 sender_ptr.cast::<c_void>(),
126 )
127 };
128
129 if handle_ptr.is_null() {
130 unsafe {
131 drop(Box::from_raw(sender_ptr));
132 }
133 return Err(SystemConfigurationError::last(
134 "sc_dynamic_store_notification_subscribe",
135 ));
136 }
137
138 Ok(Self {
139 inner: stream,
140 _handle: SubscriptionHandle::new(
141 handle_ptr,
142 sender_ptr,
143 ffi::async_api::sc_dynamic_store_notification_unsubscribe,
144 ),
145 })
146 }
147
148 pub const fn next(&self) -> NextItem<'_, DynamicStoreNotificationEvent> {
149 self.inner.next()
150 }
151
152 pub fn try_next(&self) -> Option<DynamicStoreNotificationEvent> {
153 self.inner.try_next()
154 }
155
156 pub fn buffered_count(&self) -> usize {
157 self.inner.buffered_count()
158 }
159}
160
161#[derive(Debug, Clone, Copy)]
162pub struct ReachabilityEvent {
163 pub flags: ReachabilityFlags,
164}
165
166pub struct ReachabilityStream {
167 inner: BoundedAsyncStream<ReachabilityEvent>,
168 _handle: SubscriptionHandle,
169}
170
171unsafe extern "C" fn reachability_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
172 if ctx.is_null() {
173 return;
174 }
175
176 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ReachabilityEvent>>() };
182 doom_fish_utils::panic_safe::catch_user_panic("reachability_async_cb", || {
183 sender.push(ReachabilityEvent {
184 flags: ReachabilityFlags(u32::from_ne_bytes(kind.to_ne_bytes())),
185 });
186 });
187}
188
189impl ReachabilityStream {
190 pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
191 let c_name = bridge::cstring(name, "sc_reachability_notification_subscribe")?;
192
193 let (stream, sender) = BoundedAsyncStream::new(capacity);
194 let sender_ptr = Box::into_raw(Box::new(sender));
195
196 let handle_ptr = unsafe {
197 ffi::async_api::sc_reachability_notification_subscribe(
198 c_name.as_ptr(),
199 Some(reachability_async_cb),
200 sender_ptr.cast::<c_void>(),
201 )
202 };
203
204 if handle_ptr.is_null() {
205 unsafe {
206 drop(Box::from_raw(sender_ptr));
207 }
208 return Err(SystemConfigurationError::last(
209 "sc_reachability_notification_subscribe",
210 ));
211 }
212
213 Ok(Self {
214 inner: stream,
215 _handle: SubscriptionHandle::new(
216 handle_ptr,
217 sender_ptr,
218 ffi::async_api::sc_reachability_notification_unsubscribe,
219 ),
220 })
221 }
222
223 pub const fn next(&self) -> NextItem<'_, ReachabilityEvent> {
224 self.inner.next()
225 }
226
227 pub fn try_next(&self) -> Option<ReachabilityEvent> {
228 self.inner.try_next()
229 }
230
231 pub fn buffered_count(&self) -> usize {
232 self.inner.buffered_count()
233 }
234}
235
236#[derive(Debug, Clone, Copy)]
237pub struct PreferencesNotificationEvent {
238 pub notification: PreferencesNotification,
239}
240
241pub struct PreferencesNotificationStream {
242 inner: BoundedAsyncStream<PreferencesNotificationEvent>,
243 _handle: SubscriptionHandle,
244}
245
246unsafe extern "C" fn preferences_async_cb(kind: i32, _payload: *mut c_void, ctx: *mut c_void) {
247 if ctx.is_null() {
248 return;
249 }
250
251 let sender = unsafe { &*ctx.cast::<AsyncStreamSender<PreferencesNotificationEvent>>() };
255 doom_fish_utils::panic_safe::catch_user_panic("preferences_async_cb", || {
256 sender.push(PreferencesNotificationEvent {
257 notification: PreferencesNotification::from_raw(u32::from_ne_bytes(
258 kind.to_ne_bytes(),
259 )),
260 });
261 });
262}
263
264impl PreferencesNotificationStream {
265 pub fn subscribe(name: &str, capacity: usize) -> Result<Self> {
266 let c_name = bridge::cstring(name, "sc_preferences_notification_subscribe")?;
267
268 let (stream, sender) = BoundedAsyncStream::new(capacity);
269 let sender_ptr = Box::into_raw(Box::new(sender));
270
271 let handle_ptr = unsafe {
272 ffi::async_api::sc_preferences_notification_subscribe(
273 c_name.as_ptr(),
274 Some(preferences_async_cb),
275 sender_ptr.cast::<c_void>(),
276 )
277 };
278
279 if handle_ptr.is_null() {
280 unsafe {
281 drop(Box::from_raw(sender_ptr));
282 }
283 return Err(SystemConfigurationError::last(
284 "sc_preferences_notification_subscribe",
285 ));
286 }
287
288 Ok(Self {
289 inner: stream,
290 _handle: SubscriptionHandle::new(
291 handle_ptr,
292 sender_ptr,
293 ffi::async_api::sc_preferences_notification_unsubscribe,
294 ),
295 })
296 }
297
298 pub const fn next(&self) -> NextItem<'_, PreferencesNotificationEvent> {
299 self.inner.next()
300 }
301
302 pub fn try_next(&self) -> Option<PreferencesNotificationEvent> {
303 self.inner.try_next()
304 }
305
306 pub fn buffered_count(&self) -> usize {
307 self.inner.buffered_count()
308 }
309}