Skip to main content

doom_fish_utils/
stream.rs

1//! Executor-agnostic bounded async streams for FFI callbacks.
2//!
3//! `BoundedAsyncStream<T>` is a generic, runtime-agnostic stream primitive
4//! designed for wrapping Apple SDK callback / delegate / KVO patterns:
5//!
6//! * **Bounded** — backed by a fixed-capacity `VecDeque`. When the buffer
7//!   is full and a new item arrives from the producer, the **oldest**
8//!   queued item is dropped to make room (lossy by design).
9//! * **Waker-driven** — implements `std::future::Future` via a stored
10//!   `Waker`; works with any executor (tokio, async-std, smol, futures,
11//!   etc.) without requiring a runtime feature.
12//! * **`Send + Sync`** — produces and consumes can live on different
13//!   threads, locked by a single `Mutex`.
14//!
15//! The lossy-oldest-drop policy is the right default for real-time event
16//! streams (UI input, frame capture, BLE notifications, location updates):
17//! a slow consumer should always see the latest event, not a stale queue.
18//! When you instead need back-pressure (every event must be delivered),
19//! use [`BoundedAsyncStream::push_or_block`] which blocks the producer
20//! until the consumer drains capacity.
21//!
22//! # Example
23//!
24//! ```no_run
25//! use doom_fish_utils::stream::BoundedAsyncStream;
26//! use std::sync::Arc;
27//!
28//! # async fn run() {
29//! // 8-element ring buffer of `String` events.
30//! let (stream, sender) = BoundedAsyncStream::<String>::new(8);
31//!
32//! // Producer side: typically a Swift delegate / extern "C" callback
33//! // running on a background queue.
34//! std::thread::spawn(move || {
35//!     for i in 0..100 {
36//!         sender.push(format!("event #{i}"));
37//!     }
38//!     drop(sender); // closes the stream
39//! });
40//!
41//! // Consumer side: any async runtime.
42//! while let Some(event) = stream.next().await {
43//!     println!("got {event}");
44//! }
45//! # }
46//! ```
47
48use std::collections::VecDeque;
49use std::fmt;
50use std::future::Future;
51use std::pin::Pin;
52use std::sync::{Arc, Condvar, Mutex};
53use std::task::{Context, Poll, Waker};
54
55/// Backing storage shared between the [`BoundedAsyncStream`] consumer and
56/// every [`AsyncStreamSender`] producer.
57struct State<T> {
58    buffer: VecDeque<T>,
59    waker: Option<Waker>,
60    capacity: usize,
61    /// Set to `true` when every sender has been dropped. The consumer's
62    /// `next()` then returns `None` once the buffer drains.
63    closed: bool,
64}
65
66/// Notifies a producer that's blocked in [`AsyncStreamSender::push_or_block`]
67/// that the consumer made room in the buffer.
68struct BackPressure {
69    cvar: Condvar,
70    /// Set to `true` when the stream is dropped — wakes any blocked
71    /// producers so they can bail out instead of waiting forever.
72    consumer_gone: Mutex<bool>,
73}
74
75/// A bounded, lossy-by-default, executor-agnostic async stream.
76///
77/// Items are pushed by one or more [`AsyncStreamSender`] handles and pulled
78/// asynchronously via [`BoundedAsyncStream::next`].
79///
80/// See the [module-level docs](crate::stream) for the full design rationale.
81pub struct BoundedAsyncStream<T> {
82    state: Arc<Mutex<State<T>>>,
83    back_pressure: Arc<BackPressure>,
84}
85
86/// Producer handle for a [`BoundedAsyncStream`].
87///
88/// Cheap to clone (`Arc` under the hood). Drop the last `AsyncStreamSender`
89/// to close the stream; the consumer's `next()` will yield `None` once the
90/// buffer is empty.
91pub struct AsyncStreamSender<T> {
92    state: Arc<Mutex<State<T>>>,
93    back_pressure: Arc<BackPressure>,
94}
95
96impl<T> Clone for AsyncStreamSender<T> {
97    fn clone(&self) -> Self {
98        Self {
99            state: Arc::clone(&self.state),
100            back_pressure: Arc::clone(&self.back_pressure),
101        }
102    }
103}
104
105impl<T> fmt::Debug for BoundedAsyncStream<T> {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        f.debug_struct("BoundedAsyncStream")
108            .field("buffered", &self.buffered_count())
109            .field("capacity", &self.capacity())
110            .field("is_closed", &self.is_closed())
111            .finish_non_exhaustive()
112    }
113}
114
115impl<T> fmt::Debug for AsyncStreamSender<T> {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("AsyncStreamSender").finish_non_exhaustive()
118    }
119}
120
121impl<T> BoundedAsyncStream<T> {
122    /// Creates a new bounded stream with the given capacity.
123    ///
124    /// Returns the consumer side and a single producer; clone the sender
125    /// to fan out to multiple producers.
126    ///
127    /// # Panics
128    ///
129    /// Panics if `capacity` is 0 — a zero-capacity buffer would drop every
130    /// item before the consumer could observe it. Use capacity 1 if you
131    /// genuinely want "latest only" semantics.
132    #[must_use]
133    pub fn new(capacity: usize) -> (Self, AsyncStreamSender<T>) {
134        assert!(capacity > 0, "BoundedAsyncStream capacity must be > 0");
135
136        let state = Arc::new(Mutex::new(State {
137            buffer: VecDeque::with_capacity(capacity),
138            waker: None,
139            capacity,
140            closed: false,
141        }));
142        let back_pressure = Arc::new(BackPressure {
143            cvar: Condvar::new(),
144            consumer_gone: Mutex::new(false),
145        });
146
147        let stream = Self {
148            state: Arc::clone(&state),
149            back_pressure: Arc::clone(&back_pressure),
150        };
151        let sender = AsyncStreamSender {
152            state,
153            back_pressure,
154        };
155        (stream, sender)
156    }
157
158    /// Returns a future that resolves to the next item, or `None` once the
159    /// stream is closed and drained.
160    #[must_use]
161    pub const fn next(&self) -> NextItem<'_, T> {
162        NextItem { stream: self }
163    }
164
165    /// Non-blocking pop. Returns `None` if the buffer is empty (regardless
166    /// of whether the stream is open or closed).
167    #[must_use]
168    pub fn try_next(&self) -> Option<T> {
169        self.state.lock().ok()?.buffer.pop_front()
170    }
171
172    /// Returns `true` if the stream has been closed (all senders dropped).
173    /// Note: a closed stream may still have buffered items to drain.
174    #[must_use]
175    pub fn is_closed(&self) -> bool {
176        self.state.lock().map_or(true, |s| s.closed)
177    }
178
179    /// Returns the number of items currently buffered (0..=capacity).
180    #[must_use]
181    pub fn buffered_count(&self) -> usize {
182        self.state.lock().map_or(0, |s| s.buffer.len())
183    }
184
185    /// Returns the buffer capacity, as passed to [`Self::new`].
186    #[must_use]
187    pub fn capacity(&self) -> usize {
188        self.state.lock().map_or(0, |s| s.capacity)
189    }
190
191    /// Drops all currently buffered items without closing the stream.
192    pub fn clear_buffer(&self) {
193        if let Ok(mut state) = self.state.lock() {
194            state.buffer.clear();
195        }
196    }
197}
198
199impl<T> Drop for BoundedAsyncStream<T> {
200    fn drop(&mut self) {
201        if let Ok(mut consumer_gone) = self.back_pressure.consumer_gone.lock() {
202            *consumer_gone = true;
203        }
204        self.back_pressure.cvar.notify_all();
205    }
206}
207
208impl<T> AsyncStreamSender<T> {
209    /// Push an item; drops the oldest queued item if the buffer is at
210    /// capacity. This is the lossy default.
211    pub fn push(&self, item: T) {
212        let Ok(mut state) = self.state.lock() else {
213            return;
214        };
215
216        if state.buffer.len() >= state.capacity {
217            state.buffer.pop_front();
218        }
219        state.buffer.push_back(item);
220
221        if let Some(w) = state.waker.take() {
222            w.wake();
223        }
224    }
225
226    /// Push an item, blocking the current thread if the buffer is full
227    /// until the consumer drains an item.
228    ///
229    /// Returns `Err(item)` if the consumer side has been dropped — the
230    /// item is returned to the caller so it isn't leaked.
231    ///
232    /// # Errors
233    ///
234    /// Returns `Err(item)` if the consumer has been dropped.
235    ///
236    /// # Panics
237    ///
238    /// Panics if any mutex is poisoned by another thread panicking while
239    /// holding it.
240    pub fn push_or_block(&self, item: T) -> Result<(), T> {
241        let mut item_slot = Some(item);
242        let Ok(mut state_guard) = self.state.lock() else {
243            return Err(item_slot.take().expect("item present"));
244        };
245
246        loop {
247            // Bail out fast if the consumer is gone.
248            if let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() {
249                if *consumer_gone {
250                    return Err(item_slot.take().expect("item present"));
251                }
252            }
253
254            if state_guard.buffer.len() < state_guard.capacity {
255                let item = item_slot.take().expect("item present");
256                state_guard.buffer.push_back(item);
257                if let Some(w) = state_guard.waker.take() {
258                    w.wake();
259                }
260                return Ok(());
261            }
262
263            // Buffer full — wait for the consumer to make room. We must
264            // release the state mutex before parking on the cvar, otherwise
265            // the consumer can't push items in (and the consumer-drop signal
266            // can't fire either).
267            drop(state_guard);
268            let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() else {
269                return Err(item_slot.take().expect("item present"));
270            };
271            let wait_outcome = self.back_pressure.cvar.wait(consumer_gone);
272            drop(wait_outcome);
273
274            // Re-lock state and check again.
275            state_guard = match self.state.lock() {
276                Ok(g) => g,
277                Err(_) => return Err(item_slot.take().expect("item present")),
278            };
279        }
280    }
281
282    /// Returns the number of items currently buffered.
283    #[must_use]
284    pub fn buffered_count(&self) -> usize {
285        self.state.lock().map_or(0, |s| s.buffer.len())
286    }
287
288    /// Returns `true` if the consumer has been dropped.
289    #[must_use]
290    pub fn is_consumer_gone(&self) -> bool {
291        self.back_pressure.consumer_gone.lock().map_or(true, |g| *g)
292    }
293}
294
295impl<T> Drop for AsyncStreamSender<T> {
296    fn drop(&mut self) {
297        // If this was the last sender, mark the stream closed and wake any
298        // pending consumer so its `next()` returns `None`.
299        let strong = Arc::strong_count(&self.state);
300        if strong == 2 {
301            // exactly one sender (`self`) + the consumer
302            if let Ok(mut state) = self.state.lock() {
303                state.closed = true;
304                if let Some(w) = state.waker.take() {
305                    w.wake();
306                }
307            }
308        }
309        // Wake any back-pressure-blocked clones so they bail out cleanly.
310        self.back_pressure.cvar.notify_all();
311    }
312}
313
314/// Future returned by [`BoundedAsyncStream::next`].
315pub struct NextItem<'a, T> {
316    stream: &'a BoundedAsyncStream<T>,
317}
318
319impl<T> fmt::Debug for NextItem<'_, T> {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        f.debug_struct("NextItem").finish_non_exhaustive()
322    }
323}
324
325impl<T> Future for NextItem<'_, T> {
326    type Output = Option<T>;
327
328    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329        let Ok(mut state) = self.stream.state.lock() else {
330            return Poll::Ready(None);
331        };
332
333        if let Some(item) = state.buffer.pop_front() {
334            // Notify any push_or_block-blocked producers that there's room.
335            self.stream.back_pressure.cvar.notify_all();
336            return Poll::Ready(Some(item));
337        }
338
339        if state.closed {
340            return Poll::Ready(None);
341        }
342
343        // Avoid the lost-wakeup race: when the executor re-polls with a
344        // different waker (e.g. tokio::select! moves the future between
345        // arms), the previous waker would otherwise remain stored and any
346        // pending push would wake the wrong task. `will_wake` skips the
347        // clone if the executor is reusing the same waker.
348        let waker = cx.waker();
349        match state.waker {
350            Some(ref existing) if existing.will_wake(waker) => {}
351            _ => state.waker = Some(waker.clone()),
352        }
353        Poll::Pending
354    }
355}
356
357#[cfg(feature = "futures-stream")]
358impl<T: 'static> futures_core::Stream for BoundedAsyncStream<T> {
359    type Item = T;
360
361    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
362        let Ok(mut state) = self.state.lock() else {
363            return Poll::Ready(None);
364        };
365
366        if let Some(item) = state.buffer.pop_front() {
367            self.back_pressure.cvar.notify_all();
368            return Poll::Ready(Some(item));
369        }
370
371        if state.closed {
372            return Poll::Ready(None);
373        }
374
375        let waker = cx.waker();
376        match state.waker {
377            Some(ref existing) if existing.will_wake(waker) => {}
378            _ => state.waker = Some(waker.clone()),
379        }
380        Poll::Pending
381    }
382}