Skip to main content

networkframework/
async_api.rs

1//! Async stream wrappers for Network.framework callback-based APIs.
2//!
3//! Each stream type subscribes to one or more handler-based Apple APIs and
4//! delivers events through a [`BoundedAsyncStream`] that any async runtime
5//! can `.await` on.
6//!
7//! # Feature gate
8//!
9//! All types in this module require the **`async`** cargo feature.
10//!
11//! # Back-pressure policy
12//!
13//! Streams are **lossy by default**: when the internal buffer is full the
14//! oldest event is dropped to make room for the newest. Choose a capacity
15//! large enough that your consumer can drain it between bursts.
16//!
17//! # Drop semantics
18//!
19//! Dropping any of the stream types automatically unsubscribes from the
20//! underlying Network.framework handler and frees the associated sender.
21
22#![cfg(feature = "async")]
23
24use core::ffi::{c_int, c_void};
25use core::fmt;
26use core::marker::PhantomData;
27use core::ptr;
28use doom_fish_utils::panic_safe::catch_user_panic;
29use doom_fish_utils::stream::{AsyncStreamSender, BoundedAsyncStream, NextItem};
30
31use crate::browser::{BrowseResult, BrowseResultChange, BrowserState};
32use crate::error::FrameworkError;
33use crate::ffi;
34
35struct SubscriptionHandle {
36    cleanup: Option<Box<dyn FnOnce() + Send>>,
37}
38
39impl Drop for SubscriptionHandle {
40    fn drop(&mut self) {
41        if let Some(cleanup) = self.cleanup.take() {
42            cleanup();
43        }
44    }
45}
46
47// SAFETY: `SubscriptionHandle` only stores a `Send` cleanup closure and moves
48// it to the dropping thread; no raw pointers are shared here.
49unsafe impl Send for SubscriptionHandle {}
50// SAFETY: shared references never execute the cleanup closure. It is only taken
51// and run during `Drop`, which requires unique access.
52unsafe impl Sync for SubscriptionHandle {}
53
54impl fmt::Debug for SubscriptionHandle {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        f.debug_struct("SubscriptionHandle")
57            .field("has_cleanup", &self.cleanup.is_some())
58            .finish_non_exhaustive()
59    }
60}
61
62/// State of an `nw_connection_t`.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ConnectionState {
65    Invalid,
66    Waiting,
67    Preparing,
68    Ready,
69    Failed,
70    Cancelled,
71    Unknown(i32),
72}
73
74impl ConnectionState {
75    const fn from_raw(raw: i32) -> Self {
76        match raw {
77            0 => Self::Invalid,
78            1 => Self::Waiting,
79            2 => Self::Preparing,
80            3 => Self::Ready,
81            4 => Self::Failed,
82            5 => Self::Cancelled,
83            other => Self::Unknown(other),
84        }
85    }
86}
87
88/// Event fired when `nw_connection_set_state_changed_handler` fires.
89#[derive(Clone)]
90pub struct ConnectionStateEvent {
91    pub state: ConnectionState,
92    pub error: Option<FrameworkError>,
93}
94
95impl fmt::Debug for ConnectionStateEvent {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("ConnectionStateEvent")
98            .field("state", &self.state)
99            .field(
100                "error",
101                &self
102                    .error
103                    .as_ref()
104                    .map(|error| (error.domain(), error.code())),
105            )
106            .finish()
107    }
108}
109
110/// Async stream of [`ConnectionStateEvent`] for a [`crate::client::TcpClient`].
111#[derive(Debug)]
112pub struct ConnectionStateStream<'a> {
113    inner: BoundedAsyncStream<ConnectionStateEvent>,
114    _handle: SubscriptionHandle,
115    _owner: PhantomData<&'a crate::client::TcpClient>,
116}
117
118unsafe extern "C" fn connection_state_cb(state: c_int, error: *mut c_void, ctx: *mut c_void) {
119    if ctx.is_null() {
120        return;
121    }
122
123    catch_user_panic("connection_state_cb", || {
124        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
125        // remains valid until cleanup clears the handler, drains the queue,
126        // and reclaims the sender box.
127        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ConnectionStateEvent>>() };
128        let error = if error.is_null() {
129            None
130        } else {
131            // SAFETY: the shim hands this callback a retained `nw_error_t`
132            // ownership token, which `FrameworkError::from_raw` takes over.
133            Some(unsafe { FrameworkError::from_raw(error) })
134        };
135        sender.push(ConnectionStateEvent {
136            state: ConnectionState::from_raw(state),
137            error,
138        });
139    });
140}
141
142impl<'a> ConnectionStateStream<'a> {
143    /// Subscribe to connection state changes.
144    #[must_use]
145    pub fn subscribe(client: &'a crate::client::TcpClient, capacity: usize) -> Self {
146        let (stream, sender) = BoundedAsyncStream::new(capacity);
147        let sender_ptr = Box::into_raw(Box::new(sender));
148        let obj_ptr = client.as_ptr();
149        let sender_addr = sender_ptr as usize;
150        let obj_addr = obj_ptr as usize;
151        // SAFETY: `obj_ptr` is a live borrowed connection handle and
152        // `sender_ptr` stays valid until the cleanup closure clears the handler,
153        // drains the queue, and frees it.
154        unsafe {
155            ffi::nw_shim_connection_set_state_changed_handler(
156                obj_ptr,
157                Some(connection_state_cb),
158                sender_ptr.cast(),
159            );
160        }
161        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
162            let obj_ptr = obj_addr as *mut c_void;
163            let sender_ptr = sender_addr as *mut AsyncStreamSender<ConnectionStateEvent>;
164            // SAFETY: `sender_ptr` was produced by `Box::into_raw` above. The
165            // handler is cleared and the queue is drained before we reconstruct
166            // the box, so no callback can ever observe `sender_ptr` again.
167            unsafe {
168                ffi::nw_shim_connection_set_state_changed_handler(obj_ptr, None, ptr::null_mut());
169                ffi::nw_shim_connection_drain_queue(obj_ptr);
170                drop(Box::from_raw(sender_ptr));
171            }
172        });
173        Self {
174            inner: stream,
175            _handle: SubscriptionHandle {
176                cleanup: Some(cleanup),
177            },
178            _owner: PhantomData,
179        }
180    }
181
182    /// Asynchronously wait for the next event.
183    #[must_use]
184    pub const fn next(&self) -> NextItem<'_, ConnectionStateEvent> {
185        self.inner.next()
186    }
187
188    /// Try to get an event without blocking.
189    #[must_use]
190    pub fn try_next(&self) -> Option<ConnectionStateEvent> {
191        self.inner.try_next()
192    }
193
194    /// Number of events currently buffered.
195    #[must_use]
196    pub fn buffered_count(&self) -> usize {
197        self.inner.buffered_count()
198    }
199}
200
201/// Async stream of viability changes (`true` = viable) for a [`crate::client::TcpClient`].
202#[derive(Debug)]
203pub struct ConnectionViabilityStream<'a> {
204    inner: BoundedAsyncStream<bool>,
205    _handle: SubscriptionHandle,
206    _owner: PhantomData<&'a crate::client::TcpClient>,
207}
208
209unsafe extern "C" fn connection_viability_cb(value: c_int, ctx: *mut c_void) {
210    if ctx.is_null() {
211        return;
212    }
213
214    catch_user_panic("connection_viability_cb", || {
215        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
216        // remains valid until cleanup clears the handler, drains the queue,
217        // and reclaims the sender box.
218        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<bool>>() };
219        sender.push(value != 0);
220    });
221}
222
223impl<'a> ConnectionViabilityStream<'a> {
224    /// Subscribe to viability changes.
225    #[must_use]
226    pub fn subscribe(client: &'a crate::client::TcpClient, capacity: usize) -> Self {
227        let (stream, sender) = BoundedAsyncStream::new(capacity);
228        let sender_ptr = Box::into_raw(Box::new(sender));
229        let obj_ptr = client.as_ptr();
230        let sender_addr = sender_ptr as usize;
231        let obj_addr = obj_ptr as usize;
232        // SAFETY: `obj_ptr` is a live borrowed connection handle and
233        // `sender_ptr` stays valid until the cleanup closure clears the handler,
234        // drains the queue, and frees it.
235        unsafe {
236            ffi::nw_shim_connection_set_viability_changed_handler(
237                obj_ptr,
238                Some(connection_viability_cb),
239                sender_ptr.cast(),
240            );
241        }
242        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
243            let obj_ptr = obj_addr as *mut c_void;
244            let sender_ptr = sender_addr as *mut AsyncStreamSender<bool>;
245            // SAFETY: `sender_ptr` was produced by `Box::into_raw` above. The
246            // handler is cleared and the queue is drained before we reconstruct
247            // the box, so no callback can ever observe `sender_ptr` again.
248            unsafe {
249                ffi::nw_shim_connection_set_viability_changed_handler(
250                    obj_ptr,
251                    None,
252                    ptr::null_mut(),
253                );
254                ffi::nw_shim_connection_drain_queue(obj_ptr);
255                drop(Box::from_raw(sender_ptr));
256            }
257        });
258        Self {
259            inner: stream,
260            _handle: SubscriptionHandle {
261                cleanup: Some(cleanup),
262            },
263            _owner: PhantomData,
264        }
265    }
266
267    /// Asynchronously wait for the next event.
268    #[must_use]
269    pub const fn next(&self) -> NextItem<'_, bool> {
270        self.inner.next()
271    }
272
273    /// Try to get an event without blocking.
274    #[must_use]
275    pub fn try_next(&self) -> Option<bool> {
276        self.inner.try_next()
277    }
278
279    /// Number of events currently buffered.
280    #[must_use]
281    pub fn buffered_count(&self) -> usize {
282        self.inner.buffered_count()
283    }
284}
285
286/// Async stream of better-path-available events for a [`crate::client::TcpClient`].
287#[derive(Debug)]
288pub struct ConnectionBetterPathStream<'a> {
289    inner: BoundedAsyncStream<bool>,
290    _handle: SubscriptionHandle,
291    _owner: PhantomData<&'a crate::client::TcpClient>,
292}
293
294unsafe extern "C" fn connection_better_path_cb(value: c_int, ctx: *mut c_void) {
295    if ctx.is_null() {
296        return;
297    }
298
299    catch_user_panic("connection_better_path_cb", || {
300        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
301        // remains valid until cleanup clears the handler, drains the queue,
302        // and reclaims the sender box.
303        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<bool>>() };
304        sender.push(value != 0);
305    });
306}
307
308impl<'a> ConnectionBetterPathStream<'a> {
309    /// Subscribe to better-path notifications.
310    #[must_use]
311    pub fn subscribe(client: &'a crate::client::TcpClient, capacity: usize) -> Self {
312        let (stream, sender) = BoundedAsyncStream::new(capacity);
313        let sender_ptr = Box::into_raw(Box::new(sender));
314        let obj_ptr = client.as_ptr();
315        let sender_addr = sender_ptr as usize;
316        let obj_addr = obj_ptr as usize;
317        // SAFETY: `obj_ptr` is a live borrowed connection handle and
318        // `sender_ptr` stays valid until the cleanup closure clears the handler,
319        // drains the queue, and frees it.
320        unsafe {
321            ffi::nw_shim_connection_set_better_path_available_handler(
322                obj_ptr,
323                Some(connection_better_path_cb),
324                sender_ptr.cast(),
325            );
326        }
327        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
328            let obj_ptr = obj_addr as *mut c_void;
329            let sender_ptr = sender_addr as *mut AsyncStreamSender<bool>;
330            // SAFETY: `sender_ptr` was produced by `Box::into_raw` above. The
331            // handler is cleared and the queue is drained before we reconstruct
332            // the box, so no callback can ever observe `sender_ptr` again.
333            unsafe {
334                ffi::nw_shim_connection_set_better_path_available_handler(
335                    obj_ptr,
336                    None,
337                    ptr::null_mut(),
338                );
339                ffi::nw_shim_connection_drain_queue(obj_ptr);
340                drop(Box::from_raw(sender_ptr));
341            }
342        });
343        Self {
344            inner: stream,
345            _handle: SubscriptionHandle {
346                cleanup: Some(cleanup),
347            },
348            _owner: PhantomData,
349        }
350    }
351
352    /// Asynchronously wait for the next event.
353    #[must_use]
354    pub const fn next(&self) -> NextItem<'_, bool> {
355        self.inner.next()
356    }
357
358    /// Try to get an event without blocking.
359    #[must_use]
360    pub fn try_next(&self) -> Option<bool> {
361        self.inner.try_next()
362    }
363
364    /// Number of events currently buffered.
365    #[must_use]
366    pub fn buffered_count(&self) -> usize {
367        self.inner.buffered_count()
368    }
369}
370
371/// Async stream of path-changed events for a [`crate::client::TcpClient`].
372#[derive(Debug)]
373pub struct ConnectionPathChangedStream<'a> {
374    inner: BoundedAsyncStream<crate::path::Path>,
375    _handle: SubscriptionHandle,
376    _owner: PhantomData<&'a crate::client::TcpClient>,
377}
378
379unsafe extern "C" fn connection_path_changed_cb(path: *mut c_void, ctx: *mut c_void) {
380    if path.is_null() || ctx.is_null() {
381        return;
382    }
383
384    catch_user_panic("connection_path_changed_cb", || {
385        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
386        // remains valid until cleanup clears the handler, drains the queue,
387        // and reclaims the sender box.
388        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<crate::path::Path>>() };
389        // SAFETY: the shim passes this callback a retained `nw_path_t`
390        // ownership token, which `Path::from_raw` takes over.
391        let path = unsafe { crate::path::Path::from_raw(path) };
392        sender.push(path);
393    });
394}
395
396impl<'a> ConnectionPathChangedStream<'a> {
397    /// Subscribe to path changes.
398    #[must_use]
399    pub fn subscribe(client: &'a crate::client::TcpClient, capacity: usize) -> Self {
400        let (stream, sender) = BoundedAsyncStream::new(capacity);
401        let sender_ptr = Box::into_raw(Box::new(sender));
402        let obj_ptr = client.as_ptr();
403        let sender_addr = sender_ptr as usize;
404        let obj_addr = obj_ptr as usize;
405        // SAFETY: `obj_ptr` is a live borrowed connection handle and
406        // `sender_ptr` stays valid until the cleanup closure clears the handler,
407        // drains the queue, and frees it.
408        unsafe {
409            ffi::nw_shim_connection_set_path_changed_handler(
410                obj_ptr,
411                Some(connection_path_changed_cb),
412                sender_ptr.cast(),
413            );
414        }
415        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
416            let obj_ptr = obj_addr as *mut c_void;
417            let sender_ptr = sender_addr as *mut AsyncStreamSender<crate::path::Path>;
418            // SAFETY: `sender_ptr` was produced by `Box::into_raw` above. The
419            // handler is cleared and the queue is drained before we reconstruct
420            // the box, so no callback can ever observe `sender_ptr` again.
421            unsafe {
422                ffi::nw_shim_connection_set_path_changed_handler(obj_ptr, None, ptr::null_mut());
423                ffi::nw_shim_connection_drain_queue(obj_ptr);
424                drop(Box::from_raw(sender_ptr));
425            }
426        });
427        Self {
428            inner: stream,
429            _handle: SubscriptionHandle {
430                cleanup: Some(cleanup),
431            },
432            _owner: PhantomData,
433        }
434    }
435
436    /// Asynchronously wait for the next event.
437    #[must_use]
438    pub const fn next(&self) -> NextItem<'_, crate::path::Path> {
439        self.inner.next()
440    }
441
442    /// Try to get an event without blocking.
443    #[must_use]
444    pub fn try_next(&self) -> Option<crate::path::Path> {
445        self.inner.try_next()
446    }
447
448    /// Number of events currently buffered.
449    #[must_use]
450    pub fn buffered_count(&self) -> usize {
451        self.inner.buffered_count()
452    }
453}
454
455/// State of an `nw_listener_t`.
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum ListenerState {
458    Invalid,
459    Waiting,
460    Ready,
461    Failed,
462    Cancelled,
463    Unknown(i32),
464}
465
466impl ListenerState {
467    const fn from_raw(raw: i32) -> Self {
468        match raw {
469            0 => Self::Invalid,
470            1 => Self::Waiting,
471            2 => Self::Ready,
472            3 => Self::Failed,
473            4 => Self::Cancelled,
474            other => Self::Unknown(other),
475        }
476    }
477}
478
479/// Event from a [`ListenerEventStream`].
480pub enum ListenerEvent {
481    /// Listener state changed.
482    State {
483        state: ListenerState,
484        error: Option<FrameworkError>,
485    },
486    /// A new inbound connection was accepted.
487    NewConnection(crate::client::TcpClient),
488}
489
490impl fmt::Debug for ListenerEvent {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        match self {
493            Self::State { state, error } => f
494                .debug_struct("State")
495                .field("state", state)
496                .field(
497                    "error",
498                    &error.as_ref().map(|error| (error.domain(), error.code())),
499                )
500                .finish(),
501            Self::NewConnection(_) => f.write_str("NewConnection(TcpClient { .. })"),
502        }
503    }
504}
505
506struct ListenerNewConnectionContext {
507    sender: AsyncStreamSender<ListenerEvent>,
508    keepalives: crate::parameters::KeepAlives,
509}
510
511/// Async stream of [`ListenerEvent`] for a [`crate::listener::TcpListener`].
512#[derive(Debug)]
513pub struct ListenerEventStream<'a> {
514    inner: BoundedAsyncStream<ListenerEvent>,
515    _handle: SubscriptionHandle,
516    _owner: PhantomData<&'a crate::listener::TcpListener>,
517}
518
519unsafe extern "C" fn listener_state_cb(state: c_int, error: *mut c_void, ctx: *mut c_void) {
520    if ctx.is_null() {
521        return;
522    }
523
524    catch_user_panic("listener_state_cb", || {
525        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
526        // remains valid until cleanup clears the handler, drains the queue,
527        // and reclaims the sender box.
528        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<ListenerEvent>>() };
529        let error = if error.is_null() {
530            None
531        } else {
532            // SAFETY: the shim hands this callback a retained `nw_error_t`
533            // ownership token, which `FrameworkError::from_raw` takes over.
534            Some(unsafe { FrameworkError::from_raw(error) })
535        };
536        sender.push(ListenerEvent::State {
537            state: ListenerState::from_raw(state),
538            error,
539        });
540    });
541}
542
543unsafe extern "C" fn listener_new_connection_cb(connection_handle: *mut c_void, ctx: *mut c_void) {
544    if connection_handle.is_null() || ctx.is_null() {
545        return;
546    }
547
548    catch_user_panic("listener_new_connection_cb", || {
549        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
550        // remains valid until cleanup clears the handler, drains the queue,
551        // and reclaims the context box.
552        let ctx = unsafe { &*ctx.cast::<ListenerNewConnectionContext>() };
553        // SAFETY: `connection_handle` is a live accepted-connection handle
554        // produced by the listener shim, and ownership transfers to the new
555        // `TcpClient` wrapper.
556        let client = unsafe {
557            crate::client::TcpClient::from_raw_with_keepalives(
558                connection_handle,
559                ctx.keepalives.clone(),
560            )
561        };
562        ctx.sender.push(ListenerEvent::NewConnection(client));
563    });
564}
565
566impl<'a> ListenerEventStream<'a> {
567    /// Subscribe to listener state changes and inbound connections.
568    #[must_use]
569    pub fn subscribe(listener: &'a crate::listener::TcpListener, capacity: usize) -> Self {
570        let (stream, sender) = BoundedAsyncStream::new(capacity);
571        let state_ptr = Box::into_raw(Box::new(sender.clone()));
572        let conn_ptr = Box::into_raw(Box::new(ListenerNewConnectionContext {
573            sender,
574            keepalives: listener.keepalives(),
575        }));
576        let obj_ptr = listener.as_ptr();
577        let state_addr = state_ptr as usize;
578        let conn_addr = conn_ptr as usize;
579        let obj_addr = obj_ptr as usize;
580        // SAFETY: `obj_ptr` is a live borrowed listener handle, and both box
581        // pointers stay valid until the cleanup closure clears the handlers,
582        // drains the queue, and frees them.
583        unsafe {
584            ffi::nw_shim_listener_set_state_changed_handler(
585                obj_ptr,
586                Some(listener_state_cb),
587                state_ptr.cast(),
588            );
589            ffi::nw_shim_listener_set_new_connection_handler(
590                obj_ptr,
591                Some(listener_new_connection_cb),
592                conn_ptr.cast(),
593            );
594        }
595        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
596            let obj_ptr = obj_addr as *mut c_void;
597            let state_ptr = state_addr as *mut AsyncStreamSender<ListenerEvent>;
598            let conn_ptr = conn_addr as *mut ListenerNewConnectionContext;
599            // SAFETY: `state_ptr` and `conn_ptr` were produced by `Box::into_raw`
600            // above. Both handlers are cleared and the queue is drained before
601            // we reconstruct the boxes, so no callback can ever observe these
602            // pointers again.
603            unsafe {
604                ffi::nw_shim_listener_set_state_changed_handler(obj_ptr, None, ptr::null_mut());
605                ffi::nw_shim_listener_set_new_connection_handler(obj_ptr, None, ptr::null_mut());
606                ffi::nw_shim_listener_drain_queue(obj_ptr);
607                drop(Box::from_raw(state_ptr));
608                drop(Box::from_raw(conn_ptr));
609            }
610        });
611        Self {
612            inner: stream,
613            _handle: SubscriptionHandle {
614                cleanup: Some(cleanup),
615            },
616            _owner: PhantomData,
617        }
618    }
619
620    /// Asynchronously wait for the next event.
621    #[must_use]
622    pub const fn next(&self) -> NextItem<'_, ListenerEvent> {
623        self.inner.next()
624    }
625
626    /// Try to get an event without blocking.
627    #[must_use]
628    pub fn try_next(&self) -> Option<ListenerEvent> {
629        self.inner.try_next()
630    }
631
632    /// Number of events currently buffered.
633    #[must_use]
634    pub fn buffered_count(&self) -> usize {
635        self.inner.buffered_count()
636    }
637}
638
639/// Async stream of path updates from a [`crate::path_monitor::PathMonitor`].
640#[derive(Debug)]
641pub struct PathUpdateStream<'a> {
642    inner: BoundedAsyncStream<crate::path::Path>,
643    _handle: SubscriptionHandle,
644    _owner: PhantomData<&'a crate::path_monitor::PathMonitor>,
645}
646
647unsafe extern "C" fn path_update_cb(path: *mut c_void, ctx: *mut c_void) {
648    if path.is_null() || ctx.is_null() {
649        return;
650    }
651
652    catch_user_panic("path_update_cb", || {
653        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
654        // remains valid until cleanup clears the handler, drains the queue,
655        // and reclaims the sender box.
656        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<crate::path::Path>>() };
657        // SAFETY: the shim passes this callback a retained `nw_path_t`
658        // ownership token, which `Path::from_raw` takes over.
659        let path = unsafe { crate::path::Path::from_raw(path) };
660        sender.push(path);
661    });
662}
663
664impl<'a> PathUpdateStream<'a> {
665    /// Subscribe to path monitor updates.
666    #[must_use]
667    pub fn subscribe(monitor: &'a crate::path_monitor::PathMonitor, capacity: usize) -> Self {
668        let (stream, sender) = BoundedAsyncStream::new(capacity);
669        let sender_ptr = Box::into_raw(Box::new(sender));
670        let obj_ptr = monitor.as_ptr();
671        let sender_addr = sender_ptr as usize;
672        let obj_addr = obj_ptr as usize;
673        // SAFETY: `obj_ptr` is a live borrowed path-monitor handle and
674        // `sender_ptr` stays valid until the cleanup closure clears the handler,
675        // drains the queue, and frees it.
676        unsafe {
677            ffi::nw_shim_path_monitor_set_update_handler(
678                obj_ptr,
679                Some(path_update_cb),
680                sender_ptr.cast(),
681            );
682        }
683        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
684            let obj_ptr = obj_addr as *mut c_void;
685            let sender_ptr = sender_addr as *mut AsyncStreamSender<crate::path::Path>;
686            // SAFETY: `sender_ptr` was produced by `Box::into_raw` above. The
687            // handler is cleared and the queue is drained before we reconstruct
688            // the box, so no callback can ever observe `sender_ptr` again.
689            unsafe {
690                ffi::nw_shim_path_monitor_set_update_handler(obj_ptr, None, ptr::null_mut());
691                ffi::nw_shim_path_monitor_drain_queue(obj_ptr);
692                drop(Box::from_raw(sender_ptr));
693            }
694        });
695        Self {
696            inner: stream,
697            _handle: SubscriptionHandle {
698                cleanup: Some(cleanup),
699            },
700            _owner: PhantomData,
701        }
702    }
703
704    /// Asynchronously wait for the next event.
705    #[must_use]
706    pub const fn next(&self) -> NextItem<'_, crate::path::Path> {
707        self.inner.next()
708    }
709
710    /// Try to get an event without blocking.
711    #[must_use]
712    pub fn try_next(&self) -> Option<crate::path::Path> {
713        self.inner.try_next()
714    }
715
716    /// Number of events currently buffered.
717    #[must_use]
718    pub fn buffered_count(&self) -> usize {
719        self.inner.buffered_count()
720    }
721}
722
723/// Event from a [`BrowserEventStream`].
724#[derive(Clone)]
725pub enum BrowserAsyncEvent {
726    /// Browser state changed.
727    State {
728        state: BrowserState,
729        error: Option<FrameworkError>,
730    },
731    /// Browse results changed.
732    Results {
733        old_result: Option<BrowseResult>,
734        new_result: Option<BrowseResult>,
735        changes: BrowseResultChange,
736        batch_complete: bool,
737    },
738}
739
740impl fmt::Debug for BrowserAsyncEvent {
741    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
742        match self {
743            Self::State { state, error } => f
744                .debug_struct("State")
745                .field("state", state)
746                .field(
747                    "error",
748                    &error.as_ref().map(|error| (error.domain(), error.code())),
749                )
750                .finish(),
751            Self::Results {
752                old_result,
753                new_result,
754                changes,
755                batch_complete,
756            } => f
757                .debug_struct("Results")
758                .field("old_result_present", &old_result.is_some())
759                .field("new_result_present", &new_result.is_some())
760                .field("changes", changes)
761                .field("batch_complete", batch_complete)
762                .finish(),
763        }
764    }
765}
766
767/// Async stream of [`BrowserAsyncEvent`] from a [`crate::browser::Browser`].
768#[derive(Debug)]
769pub struct BrowserEventStream<'a> {
770    inner: BoundedAsyncStream<BrowserAsyncEvent>,
771    _handle: SubscriptionHandle,
772    _owner: PhantomData<&'a crate::browser::Browser>,
773}
774
775unsafe extern "C" fn browser_state_cb(state: c_int, error: *mut c_void, ctx: *mut c_void) {
776    if ctx.is_null() {
777        return;
778    }
779
780    catch_user_panic("browser_state_cb", || {
781        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
782        // remains valid until cleanup clears the handler, drains the queue,
783        // and reclaims the sender box.
784        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<BrowserAsyncEvent>>() };
785        let error = if error.is_null() {
786            None
787        } else {
788            // SAFETY: the shim hands this callback a retained `nw_error_t`
789            // ownership token, which `FrameworkError::from_raw` takes over.
790            Some(unsafe { FrameworkError::from_raw(error) })
791        };
792        sender.push(BrowserAsyncEvent::State {
793            state: BrowserState::from_raw(state),
794            error,
795        });
796    });
797}
798
799unsafe extern "C" fn browser_results_cb(
800    old_result: *mut c_void,
801    new_result: *mut c_void,
802    changes: u64,
803    batch_complete: c_int,
804    ctx: *mut c_void,
805) {
806    if ctx.is_null() {
807        return;
808    }
809
810    catch_user_panic("browser_results_cb", || {
811        // SAFETY: `ctx` was created by `Box::into_raw` in `subscribe` and
812        // remains valid until cleanup clears the handler, drains the queue,
813        // and reclaims the sender box.
814        let sender = unsafe { &*ctx.cast::<AsyncStreamSender<BrowserAsyncEvent>>() };
815        let old_result = if old_result.is_null() {
816            None
817        } else {
818            // SAFETY: the shim passes retained browse-result handles to the
819            // callback, and ownership transfers to the Rust wrappers.
820            Some(unsafe { BrowseResult::from_raw(old_result) })
821        };
822        let new_result = if new_result.is_null() {
823            None
824        } else {
825            // SAFETY: the shim passes retained browse-result handles to the
826            // callback, and ownership transfers to the Rust wrappers.
827            Some(unsafe { BrowseResult::from_raw(new_result) })
828        };
829        sender.push(BrowserAsyncEvent::Results {
830            old_result,
831            new_result,
832            changes: BrowseResultChange::from_raw(changes),
833            batch_complete: batch_complete != 0,
834        });
835    });
836}
837
838impl<'a> BrowserEventStream<'a> {
839    /// Subscribe to browser state and browse-result updates.
840    #[must_use]
841    pub fn subscribe(browser: &'a crate::browser::Browser, capacity: usize) -> Self {
842        let (stream, sender) = BoundedAsyncStream::new(capacity);
843        let state_ptr = Box::into_raw(Box::new(sender.clone()));
844        let results_ptr = Box::into_raw(Box::new(sender));
845        let obj_ptr = browser.as_ptr();
846        let state_addr = state_ptr as usize;
847        let results_addr = results_ptr as usize;
848        let obj_addr = obj_ptr as usize;
849        // SAFETY: `obj_ptr` is a live borrowed browser handle, and both box
850        // pointers stay valid until the cleanup closure clears the handlers,
851        // drains the queue, and frees them.
852        unsafe {
853            ffi::nw_shim_browser_set_state_changed_handler(
854                obj_ptr,
855                Some(browser_state_cb),
856                state_ptr.cast(),
857            );
858            ffi::nw_shim_browser_set_browse_results_changed_handler(
859                obj_ptr,
860                Some(browser_results_cb),
861                results_ptr.cast(),
862            );
863        }
864        let cleanup: Box<dyn FnOnce() + Send> = Box::new(move || {
865            let obj_ptr = obj_addr as *mut c_void;
866            let state_ptr = state_addr as *mut AsyncStreamSender<BrowserAsyncEvent>;
867            let results_ptr = results_addr as *mut AsyncStreamSender<BrowserAsyncEvent>;
868            // SAFETY: `state_ptr` and `results_ptr` were produced by
869            // `Box::into_raw` above. Both handlers are cleared and the queue is
870            // drained before we reconstruct the boxes, so no callback can ever
871            // observe these pointers again.
872            unsafe {
873                ffi::nw_shim_browser_set_state_changed_handler(obj_ptr, None, ptr::null_mut());
874                ffi::nw_shim_browser_set_browse_results_changed_handler(
875                    obj_ptr,
876                    None,
877                    ptr::null_mut(),
878                );
879                ffi::nw_shim_browser_drain_queue(obj_ptr);
880                drop(Box::from_raw(state_ptr));
881                drop(Box::from_raw(results_ptr));
882            }
883        });
884        Self {
885            inner: stream,
886            _handle: SubscriptionHandle {
887                cleanup: Some(cleanup),
888            },
889            _owner: PhantomData,
890        }
891    }
892
893    /// Asynchronously wait for the next event.
894    #[must_use]
895    pub const fn next(&self) -> NextItem<'_, BrowserAsyncEvent> {
896        self.inner.next()
897    }
898
899    /// Try to get an event without blocking.
900    #[must_use]
901    pub fn try_next(&self) -> Option<BrowserAsyncEvent> {
902        self.inner.try_next()
903    }
904
905    /// Number of events currently buffered.
906    #[must_use]
907    pub fn buffered_count(&self) -> usize {
908        self.inner.buffered_count()
909    }
910}