Skip to main content

doom_fish_utils/
stream.rs

1//! Executor-agnostic bounded async streams for FFI callbacks.
2//!
3//! `BoundedAsyncStream<T>` is a generic, runtime-agnostic stream primitive
4//! designed for wrapping Apple SDK callback / delegate / KVO patterns:
5//!
6//! * **Bounded** — backed by a fixed-capacity `VecDeque`. When the buffer
7//!   is full and a new item arrives from the producer, the **oldest**
8//!   queued item is dropped to make room (lossy by design).
9//! * **Waker-driven** — implements `std::future::Future` via a stored
10//!   `Waker`; works with any executor (tokio, async-std, smol, futures,
11//!   etc.) without requiring a runtime feature.
12//! * **`Send + Sync`** — produces and consumes can live on different
13//!   threads, locked by a single `Mutex`.
14//!
15//! The lossy-oldest-drop policy is the right default for real-time event
16//! streams (UI input, frame capture, BLE notifications, location updates):
17//! a slow consumer should always see the latest event, not a stale queue.
18//! When you instead need back-pressure (every event must be delivered),
19//! use [`AsyncStreamSender::push_or_block`] which blocks the producer
20//! until the consumer drains capacity.
21//!
22//! # Example
23//!
24//! ```no_run
25//! use doom_fish_utils::stream::BoundedAsyncStream;
26//! use std::sync::Arc;
27//!
28//! # async fn run() {
29//! // 8-element ring buffer of `String` events.
30//! let (stream, sender) = BoundedAsyncStream::<String>::new(8);
31//!
32//! // Producer side: typically a Swift delegate / extern "C" callback
33//! // running on a background queue.
34//! std::thread::spawn(move || {
35//!     for i in 0..100 {
36//!         sender.push(format!("event #{i}"));
37//!     }
38//!     drop(sender); // closes the stream
39//! });
40//!
41//! // Consumer side: any async runtime.
42//! while let Some(event) = stream.next().await {
43//!     println!("got {event}");
44//! }
45//! # }
46//! ```
47
48use std::collections::VecDeque;
49use std::fmt;
50use std::future::Future;
51use std::pin::Pin;
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::sync::{Arc, Condvar, Mutex};
54use std::task::{Context, Poll, Waker};
55
56/// Backing storage shared between the [`BoundedAsyncStream`] consumer and
57/// every [`AsyncStreamSender`] producer.
58struct State<T> {
59    buffer: VecDeque<T>,
60    waker: Option<Waker>,
61    capacity: usize,
62    /// Set to `true` when every sender has been dropped. The consumer's
63    /// `next()` then returns `None` once the buffer drains.
64    closed: bool,
65}
66
67/// Notifies a producer that's blocked in [`AsyncStreamSender::push_or_block`]
68/// that the consumer made room in the buffer.
69struct BackPressure {
70    cvar: Condvar,
71    /// Set to `true` when the stream is dropped — wakes any blocked
72    /// producers so they can bail out instead of waiting forever.
73    consumer_gone: Mutex<bool>,
74    /// Tracks how many live [`AsyncStreamSender`] handles exist.
75    ///
76    /// Using an explicit atomic counter rather than `Arc::strong_count` avoids
77    /// the TOCTOU race where two concurrently-dropped last senders both observe
78    /// a count ≠ 2 and neither marks the stream `closed`, leaving the consumer
79    /// blocked forever. With an atomic decrement the last sender to reach zero
80    /// is identified unambiguously regardless of scheduling interleaving.
81    sender_count: AtomicUsize,
82}
83
84/// A bounded, lossy-by-default, executor-agnostic async stream.
85///
86/// Items are pushed by one or more [`AsyncStreamSender`] handles and pulled
87/// asynchronously via [`BoundedAsyncStream::next`].
88///
89/// See the [module-level docs](crate::stream) for the full design rationale.
90pub struct BoundedAsyncStream<T> {
91    state: Arc<Mutex<State<T>>>,
92    back_pressure: Arc<BackPressure>,
93}
94
95/// Producer handle for a [`BoundedAsyncStream`].
96///
97/// Cheap to clone (`Arc` under the hood). Drop the last `AsyncStreamSender`
98/// to close the stream; the consumer's `next()` will yield `None` once the
99/// buffer is empty.
100pub struct AsyncStreamSender<T> {
101    state: Arc<Mutex<State<T>>>,
102    back_pressure: Arc<BackPressure>,
103}
104
105impl<T> Clone for AsyncStreamSender<T> {
106    fn clone(&self) -> Self {
107        self.back_pressure
108            .sender_count
109            .fetch_add(1, Ordering::Relaxed);
110        Self {
111            state: Arc::clone(&self.state),
112            back_pressure: Arc::clone(&self.back_pressure),
113        }
114    }
115}
116
117impl<T> fmt::Debug for BoundedAsyncStream<T> {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("BoundedAsyncStream")
120            .field("buffered", &self.buffered_count())
121            .field("capacity", &self.capacity())
122            .field("is_closed", &self.is_closed())
123            .finish_non_exhaustive()
124    }
125}
126
127impl<T> fmt::Debug for AsyncStreamSender<T> {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        f.debug_struct("AsyncStreamSender").finish_non_exhaustive()
130    }
131}
132
133impl<T> BoundedAsyncStream<T> {
134    /// Creates a new bounded stream with the given capacity.
135    ///
136    /// Returns the consumer side and a single producer; clone the sender
137    /// to fan out to multiple producers.
138    ///
139    /// # Panics
140    ///
141    /// Panics if `capacity` is 0 — a zero-capacity buffer would drop every
142    /// item before the consumer could observe it. Use capacity 1 if you
143    /// genuinely want "latest only" semantics.
144    #[must_use]
145    pub fn new(capacity: usize) -> (Self, AsyncStreamSender<T>) {
146        assert!(capacity > 0, "BoundedAsyncStream capacity must be > 0");
147
148        let state = Arc::new(Mutex::new(State {
149            buffer: VecDeque::with_capacity(capacity),
150            waker: None,
151            capacity,
152            closed: false,
153        }));
154        let back_pressure = Arc::new(BackPressure {
155            cvar: Condvar::new(),
156            consumer_gone: Mutex::new(false),
157            sender_count: AtomicUsize::new(1),
158        });
159
160        let stream = Self {
161            state: Arc::clone(&state),
162            back_pressure: Arc::clone(&back_pressure),
163        };
164        let sender = AsyncStreamSender {
165            state,
166            back_pressure,
167        };
168        (stream, sender)
169    }
170
171    /// Returns a future that resolves to the next item, or `None` once the
172    /// stream is closed and drained.
173    #[must_use]
174    pub const fn next(&self) -> NextItem<'_, T> {
175        NextItem { stream: self }
176    }
177
178    /// Non-blocking pop. Returns `None` if the buffer is empty (regardless
179    /// of whether the stream is open or closed).
180    #[must_use]
181    pub fn try_next(&self) -> Option<T> {
182        self.state.lock().ok()?.buffer.pop_front()
183    }
184
185    /// Returns `true` if the stream has been closed (all senders dropped).
186    /// Note: a closed stream may still have buffered items to drain.
187    #[must_use]
188    pub fn is_closed(&self) -> bool {
189        self.state.lock().map_or(true, |s| s.closed)
190    }
191
192    /// Returns the number of items currently buffered (0..=capacity).
193    #[must_use]
194    pub fn buffered_count(&self) -> usize {
195        self.state.lock().map_or(0, |s| s.buffer.len())
196    }
197
198    /// Returns the buffer capacity, as passed to [`Self::new`].
199    #[must_use]
200    pub fn capacity(&self) -> usize {
201        self.state.lock().map_or(0, |s| s.capacity)
202    }
203
204    /// Drops all currently buffered items without closing the stream.
205    pub fn clear_buffer(&self) {
206        if let Ok(mut state) = self.state.lock() {
207            state.buffer.clear();
208        }
209    }
210}
211
212impl<T> Drop for BoundedAsyncStream<T> {
213    fn drop(&mut self) {
214        if let Ok(mut consumer_gone) = self.back_pressure.consumer_gone.lock() {
215            *consumer_gone = true;
216        }
217        self.back_pressure.cvar.notify_all();
218    }
219}
220
221impl<T> AsyncStreamSender<T> {
222    /// Push an item; drops the oldest queued item if the buffer is at
223    /// capacity. This is the lossy default.
224    pub fn push(&self, item: T) {
225        let Ok(mut state) = self.state.lock() else {
226            return;
227        };
228
229        if state.buffer.len() >= state.capacity {
230            state.buffer.pop_front();
231        }
232        state.buffer.push_back(item);
233
234        if let Some(w) = state.waker.take() {
235            w.wake();
236        }
237    }
238
239    /// Push an item, blocking the current thread if the buffer is full
240    /// until the consumer drains an item.
241    ///
242    /// Returns `Err(item)` if the consumer side has been dropped — the
243    /// item is returned to the caller so it isn't leaked.
244    ///
245    /// # Errors
246    ///
247    /// Returns `Err(item)` if the consumer has been dropped.
248    ///
249    /// # Panics
250    ///
251    /// Panics if any mutex is poisoned by another thread panicking while
252    /// holding it.
253    pub fn push_or_block(&self, item: T) -> Result<(), T> {
254        let mut item_slot = Some(item);
255        let Ok(mut state_guard) = self.state.lock() else {
256            return Err(item_slot.take().expect("item present"));
257        };
258
259        loop {
260            // Bail out fast if the consumer is gone.
261            if let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() {
262                if *consumer_gone {
263                    return Err(item_slot.take().expect("item present"));
264                }
265            }
266
267            if state_guard.buffer.len() < state_guard.capacity {
268                let item = item_slot.take().expect("item present");
269                state_guard.buffer.push_back(item);
270                if let Some(w) = state_guard.waker.take() {
271                    w.wake();
272                }
273                return Ok(());
274            }
275
276            // Buffer full — wait for the consumer to make room. We must
277            // release the state mutex before parking on the cvar, otherwise
278            // the consumer can't push items in (and the consumer-drop signal
279            // can't fire either).
280            drop(state_guard);
281            let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() else {
282                return Err(item_slot.take().expect("item present"));
283            };
284            let wait_outcome = self.back_pressure.cvar.wait(consumer_gone);
285            drop(wait_outcome);
286
287            // Re-lock state and check again.
288            state_guard = match self.state.lock() {
289                Ok(g) => g,
290                Err(_) => return Err(item_slot.take().expect("item present")),
291            };
292        }
293    }
294
295    /// Returns the number of items currently buffered.
296    #[must_use]
297    pub fn buffered_count(&self) -> usize {
298        self.state.lock().map_or(0, |s| s.buffer.len())
299    }
300
301    /// Returns `true` if the consumer has been dropped.
302    #[must_use]
303    pub fn is_consumer_gone(&self) -> bool {
304        self.back_pressure.consumer_gone.lock().map_or(true, |g| *g)
305    }
306}
307
308impl<T> Drop for AsyncStreamSender<T> {
309    fn drop(&mut self) {
310        // Atomically decrement the sender count. The thread that observes the
311        // count dropping from 1 → 0 is, by definition, the last sender; it is
312        // responsible for marking the stream closed and waking the consumer.
313        //
314        // `AcqRel` ordering: the Release half makes all prior pushes visible to
315        // whoever wins the "count → 0" race; the Acquire half ensures we observe
316        // all prior decrements before deciding we are the last sender.
317        //
318        // This replaces the previous `Arc::strong_count` check, which had a
319        // TOCTOU race: two concurrently-dropped last senders could both observe
320        // strong_count == 3 (A + B + consumer), decide neither was "last", and
321        // leave the consumer blocked forever.
322        let prev = self
323            .back_pressure
324            .sender_count
325            .fetch_sub(1, Ordering::AcqRel);
326        if prev == 1 {
327            // We are the last sender; close the stream and wake the consumer.
328            if let Ok(mut state) = self.state.lock() {
329                state.closed = true;
330                if let Some(w) = state.waker.take() {
331                    w.wake();
332                }
333            }
334        }
335        // Wake any push_or_block-blocked clones so they bail out cleanly.
336        self.back_pressure.cvar.notify_all();
337    }
338}
339
340/// Future returned by [`BoundedAsyncStream::next`].
341pub struct NextItem<'a, T> {
342    stream: &'a BoundedAsyncStream<T>,
343}
344
345impl<T> fmt::Debug for NextItem<'_, T> {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        f.debug_struct("NextItem").finish_non_exhaustive()
348    }
349}
350
351impl<T> Future for NextItem<'_, T> {
352    type Output = Option<T>;
353
354    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355        let Ok(mut state) = self.stream.state.lock() else {
356            return Poll::Ready(None);
357        };
358
359        if let Some(item) = state.buffer.pop_front() {
360            // Notify any push_or_block-blocked producers that there's room.
361            self.stream.back_pressure.cvar.notify_all();
362            return Poll::Ready(Some(item));
363        }
364
365        if state.closed {
366            return Poll::Ready(None);
367        }
368
369        // Avoid the lost-wakeup race: when the executor re-polls with a
370        // different waker (e.g. tokio::select! moves the future between
371        // arms), the previous waker would otherwise remain stored and any
372        // pending push would wake the wrong task. `will_wake` skips the
373        // clone if the executor is reusing the same waker.
374        let waker = cx.waker();
375        match state.waker {
376            Some(ref existing) if existing.will_wake(waker) => {}
377            _ => state.waker = Some(waker.clone()),
378        }
379        Poll::Pending
380    }
381}
382
383#[cfg(feature = "futures-stream")]
384impl<T: 'static> futures_core::Stream for BoundedAsyncStream<T> {
385    type Item = T;
386
387    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
388        let Ok(mut state) = self.state.lock() else {
389            return Poll::Ready(None);
390        };
391
392        if let Some(item) = state.buffer.pop_front() {
393            self.back_pressure.cvar.notify_all();
394            return Poll::Ready(Some(item));
395        }
396
397        if state.closed {
398            return Poll::Ready(None);
399        }
400
401        let waker = cx.waker();
402        match state.waker {
403            Some(ref existing) if existing.will_wake(waker) => {}
404            _ => state.waker = Some(waker.clone()),
405        }
406        Poll::Pending
407    }
408}