doom_fish_utils/stream.rs
1//! Executor-agnostic bounded async streams for FFI callbacks.
2//!
3//! `BoundedAsyncStream<T>` is a generic, runtime-agnostic stream primitive
4//! designed for wrapping Apple SDK callback / delegate / KVO patterns:
5//!
6//! * **Bounded** — backed by a fixed-capacity `VecDeque`. When the buffer
7//! is full and a new item arrives from the producer, the **oldest**
8//! queued item is dropped to make room (lossy by design).
9//! * **Waker-driven** — implements `std::future::Future` via a stored
10//! `Waker`; works with any executor (tokio, async-std, smol, futures,
11//! etc.) without requiring a runtime feature.
12//! * **`Send + Sync`** — produces and consumes can live on different
13//! threads, locked by a single `Mutex`.
14//!
15//! The lossy-oldest-drop policy is the right default for real-time event
16//! streams (UI input, frame capture, BLE notifications, location updates):
17//! a slow consumer should always see the latest event, not a stale queue.
18//! When you instead need back-pressure (every event must be delivered),
19//! use [`BoundedAsyncStream::push_or_block`] which blocks the producer
20//! until the consumer drains capacity.
21//!
22//! # Example
23//!
24//! ```no_run
25//! use doom_fish_utils::stream::BoundedAsyncStream;
26//! use std::sync::Arc;
27//!
28//! # async fn run() {
29//! // 8-element ring buffer of `String` events.
30//! let (stream, sender) = BoundedAsyncStream::<String>::new(8);
31//!
32//! // Producer side: typically a Swift delegate / extern "C" callback
33//! // running on a background queue.
34//! std::thread::spawn(move || {
35//! for i in 0..100 {
36//! sender.push(format!("event #{i}"));
37//! }
38//! drop(sender); // closes the stream
39//! });
40//!
41//! // Consumer side: any async runtime.
42//! while let Some(event) = stream.next().await {
43//! println!("got {event}");
44//! }
45//! # }
46//! ```
47
48use std::collections::VecDeque;
49use std::fmt;
50use std::future::Future;
51use std::pin::Pin;
52use std::sync::{Arc, Condvar, Mutex};
53use std::task::{Context, Poll, Waker};
54
55/// Backing storage shared between the [`BoundedAsyncStream`] consumer and
56/// every [`AsyncStreamSender`] producer.
57struct State<T> {
58 buffer: VecDeque<T>,
59 waker: Option<Waker>,
60 capacity: usize,
61 /// Set to `true` when every sender has been dropped. The consumer's
62 /// `next()` then returns `None` once the buffer drains.
63 closed: bool,
64}
65
66/// Notifies a producer that's blocked in [`AsyncStreamSender::push_or_block`]
67/// that the consumer made room in the buffer.
68struct BackPressure {
69 cvar: Condvar,
70 /// Set to `true` when the stream is dropped — wakes any blocked
71 /// producers so they can bail out instead of waiting forever.
72 consumer_gone: Mutex<bool>,
73}
74
75/// A bounded, lossy-by-default, executor-agnostic async stream.
76///
77/// Items are pushed by one or more [`AsyncStreamSender`] handles and pulled
78/// asynchronously via [`BoundedAsyncStream::next`].
79///
80/// See the [module-level docs](crate::stream) for the full design rationale.
81pub struct BoundedAsyncStream<T> {
82 state: Arc<Mutex<State<T>>>,
83 back_pressure: Arc<BackPressure>,
84}
85
86/// Producer handle for a [`BoundedAsyncStream`].
87///
88/// Cheap to clone (`Arc` under the hood). Drop the last `AsyncStreamSender`
89/// to close the stream; the consumer's `next()` will yield `None` once the
90/// buffer is empty.
91pub struct AsyncStreamSender<T> {
92 state: Arc<Mutex<State<T>>>,
93 back_pressure: Arc<BackPressure>,
94}
95
96impl<T> Clone for AsyncStreamSender<T> {
97 fn clone(&self) -> Self {
98 Self {
99 state: Arc::clone(&self.state),
100 back_pressure: Arc::clone(&self.back_pressure),
101 }
102 }
103}
104
105impl<T> fmt::Debug for BoundedAsyncStream<T> {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 f.debug_struct("BoundedAsyncStream")
108 .field("buffered", &self.buffered_count())
109 .field("capacity", &self.capacity())
110 .field("is_closed", &self.is_closed())
111 .finish_non_exhaustive()
112 }
113}
114
115impl<T> fmt::Debug for AsyncStreamSender<T> {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("AsyncStreamSender").finish_non_exhaustive()
118 }
119}
120
121impl<T> BoundedAsyncStream<T> {
122 /// Creates a new bounded stream with the given capacity.
123 ///
124 /// Returns the consumer side and a single producer; clone the sender
125 /// to fan out to multiple producers.
126 ///
127 /// # Panics
128 ///
129 /// Panics if `capacity` is 0 — a zero-capacity buffer would drop every
130 /// item before the consumer could observe it. Use capacity 1 if you
131 /// genuinely want "latest only" semantics.
132 #[must_use]
133 pub fn new(capacity: usize) -> (Self, AsyncStreamSender<T>) {
134 assert!(capacity > 0, "BoundedAsyncStream capacity must be > 0");
135
136 let state = Arc::new(Mutex::new(State {
137 buffer: VecDeque::with_capacity(capacity),
138 waker: None,
139 capacity,
140 closed: false,
141 }));
142 let back_pressure = Arc::new(BackPressure {
143 cvar: Condvar::new(),
144 consumer_gone: Mutex::new(false),
145 });
146
147 let stream = Self {
148 state: Arc::clone(&state),
149 back_pressure: Arc::clone(&back_pressure),
150 };
151 let sender = AsyncStreamSender {
152 state,
153 back_pressure,
154 };
155 (stream, sender)
156 }
157
158 /// Returns a future that resolves to the next item, or `None` once the
159 /// stream is closed and drained.
160 #[must_use]
161 pub const fn next(&self) -> NextItem<'_, T> {
162 NextItem { stream: self }
163 }
164
165 /// Non-blocking pop. Returns `None` if the buffer is empty (regardless
166 /// of whether the stream is open or closed).
167 #[must_use]
168 pub fn try_next(&self) -> Option<T> {
169 self.state.lock().ok()?.buffer.pop_front()
170 }
171
172 /// Returns `true` if the stream has been closed (all senders dropped).
173 /// Note: a closed stream may still have buffered items to drain.
174 #[must_use]
175 pub fn is_closed(&self) -> bool {
176 self.state.lock().map_or(true, |s| s.closed)
177 }
178
179 /// Returns the number of items currently buffered (0..=capacity).
180 #[must_use]
181 pub fn buffered_count(&self) -> usize {
182 self.state.lock().map_or(0, |s| s.buffer.len())
183 }
184
185 /// Returns the buffer capacity, as passed to [`Self::new`].
186 #[must_use]
187 pub fn capacity(&self) -> usize {
188 self.state.lock().map_or(0, |s| s.capacity)
189 }
190
191 /// Drops all currently buffered items without closing the stream.
192 pub fn clear_buffer(&self) {
193 if let Ok(mut state) = self.state.lock() {
194 state.buffer.clear();
195 }
196 }
197}
198
199impl<T> Drop for BoundedAsyncStream<T> {
200 fn drop(&mut self) {
201 if let Ok(mut consumer_gone) = self.back_pressure.consumer_gone.lock() {
202 *consumer_gone = true;
203 }
204 self.back_pressure.cvar.notify_all();
205 }
206}
207
208impl<T> AsyncStreamSender<T> {
209 /// Push an item; drops the oldest queued item if the buffer is at
210 /// capacity. This is the lossy default.
211 pub fn push(&self, item: T) {
212 let Ok(mut state) = self.state.lock() else {
213 return;
214 };
215
216 if state.buffer.len() >= state.capacity {
217 state.buffer.pop_front();
218 }
219 state.buffer.push_back(item);
220
221 if let Some(w) = state.waker.take() {
222 w.wake();
223 }
224 }
225
226 /// Push an item, blocking the current thread if the buffer is full
227 /// until the consumer drains an item.
228 ///
229 /// Returns `Err(item)` if the consumer side has been dropped — the
230 /// item is returned to the caller so it isn't leaked.
231 ///
232 /// # Errors
233 ///
234 /// Returns `Err(item)` if the consumer has been dropped.
235 ///
236 /// # Panics
237 ///
238 /// Panics if any mutex is poisoned by another thread panicking while
239 /// holding it.
240 pub fn push_or_block(&self, item: T) -> Result<(), T> {
241 let mut item_slot = Some(item);
242 let Ok(mut state_guard) = self.state.lock() else {
243 return Err(item_slot.take().expect("item present"));
244 };
245
246 loop {
247 // Bail out fast if the consumer is gone.
248 if let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() {
249 if *consumer_gone {
250 return Err(item_slot.take().expect("item present"));
251 }
252 }
253
254 if state_guard.buffer.len() < state_guard.capacity {
255 let item = item_slot.take().expect("item present");
256 state_guard.buffer.push_back(item);
257 if let Some(w) = state_guard.waker.take() {
258 w.wake();
259 }
260 return Ok(());
261 }
262
263 // Buffer full — wait for the consumer to make room. We must
264 // release the state mutex before parking on the cvar, otherwise
265 // the consumer can't push items in (and the consumer-drop signal
266 // can't fire either).
267 drop(state_guard);
268 let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() else {
269 return Err(item_slot.take().expect("item present"));
270 };
271 let wait_outcome = self.back_pressure.cvar.wait(consumer_gone);
272 drop(wait_outcome);
273
274 // Re-lock state and check again.
275 state_guard = match self.state.lock() {
276 Ok(g) => g,
277 Err(_) => return Err(item_slot.take().expect("item present")),
278 };
279 }
280 }
281
282 /// Returns the number of items currently buffered.
283 #[must_use]
284 pub fn buffered_count(&self) -> usize {
285 self.state.lock().map_or(0, |s| s.buffer.len())
286 }
287
288 /// Returns `true` if the consumer has been dropped.
289 #[must_use]
290 pub fn is_consumer_gone(&self) -> bool {
291 self.back_pressure.consumer_gone.lock().map_or(true, |g| *g)
292 }
293}
294
295impl<T> Drop for AsyncStreamSender<T> {
296 fn drop(&mut self) {
297 // If this was the last sender, mark the stream closed and wake any
298 // pending consumer so its `next()` returns `None`.
299 let strong = Arc::strong_count(&self.state);
300 if strong == 2 {
301 // exactly one sender (`self`) + the consumer
302 if let Ok(mut state) = self.state.lock() {
303 state.closed = true;
304 if let Some(w) = state.waker.take() {
305 w.wake();
306 }
307 }
308 }
309 // Wake any back-pressure-blocked clones so they bail out cleanly.
310 self.back_pressure.cvar.notify_all();
311 }
312}
313
314/// Future returned by [`BoundedAsyncStream::next`].
315pub struct NextItem<'a, T> {
316 stream: &'a BoundedAsyncStream<T>,
317}
318
319impl<T> fmt::Debug for NextItem<'_, T> {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 f.debug_struct("NextItem").finish_non_exhaustive()
322 }
323}
324
325impl<T> Future for NextItem<'_, T> {
326 type Output = Option<T>;
327
328 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
329 let Ok(mut state) = self.stream.state.lock() else {
330 return Poll::Ready(None);
331 };
332
333 if let Some(item) = state.buffer.pop_front() {
334 // Notify any push_or_block-blocked producers that there's room.
335 self.stream.back_pressure.cvar.notify_all();
336 return Poll::Ready(Some(item));
337 }
338
339 if state.closed {
340 return Poll::Ready(None);
341 }
342
343 // Avoid the lost-wakeup race: when the executor re-polls with a
344 // different waker (e.g. tokio::select! moves the future between
345 // arms), the previous waker would otherwise remain stored and any
346 // pending push would wake the wrong task. `will_wake` skips the
347 // clone if the executor is reusing the same waker.
348 let waker = cx.waker();
349 match state.waker {
350 Some(ref existing) if existing.will_wake(waker) => {}
351 _ => state.waker = Some(waker.clone()),
352 }
353 Poll::Pending
354 }
355}
356
357#[cfg(feature = "futures-stream")]
358impl<T: 'static> futures_core::Stream for BoundedAsyncStream<T> {
359 type Item = T;
360
361 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
362 let Ok(mut state) = self.state.lock() else {
363 return Poll::Ready(None);
364 };
365
366 if let Some(item) = state.buffer.pop_front() {
367 self.back_pressure.cvar.notify_all();
368 return Poll::Ready(Some(item));
369 }
370
371 if state.closed {
372 return Poll::Ready(None);
373 }
374
375 let waker = cx.waker();
376 match state.waker {
377 Some(ref existing) if existing.will_wake(waker) => {}
378 _ => state.waker = Some(waker.clone()),
379 }
380 Poll::Pending
381 }
382}