doom_fish_utils/stream.rs
1//! Executor-agnostic bounded async streams for FFI callbacks.
2//!
3//! `BoundedAsyncStream<T>` is a generic, runtime-agnostic stream primitive
4//! designed for wrapping Apple SDK callback / delegate / KVO patterns:
5//!
6//! * **Bounded** — backed by a fixed-capacity `VecDeque`. When the buffer
7//! is full and a new item arrives from the producer, the **oldest**
8//! queued item is dropped to make room (lossy by design).
9//! * **Waker-driven** — implements `std::future::Future` via a stored
10//! `Waker`; works with any executor (tokio, async-std, smol, futures,
11//! etc.) without requiring a runtime feature.
12//! * **`Send + Sync`** — produces and consumes can live on different
13//! threads, locked by a single `Mutex`.
14//!
15//! The lossy-oldest-drop policy is the right default for real-time event
16//! streams (UI input, frame capture, BLE notifications, location updates):
17//! a slow consumer should always see the latest event, not a stale queue.
18//! When you instead need back-pressure (every event must be delivered),
19//! use [`AsyncStreamSender::push_or_block`] which blocks the producer
20//! until the consumer drains capacity.
21//!
22//! # Example
23//!
24//! ```no_run
25//! use doom_fish_utils::stream::BoundedAsyncStream;
26//! use std::sync::Arc;
27//!
28//! # async fn run() {
29//! // 8-element ring buffer of `String` events.
30//! let (stream, sender) = BoundedAsyncStream::<String>::new(8);
31//!
32//! // Producer side: typically a Swift delegate / extern "C" callback
33//! // running on a background queue.
34//! std::thread::spawn(move || {
35//! for i in 0..100 {
36//! sender.push(format!("event #{i}"));
37//! }
38//! drop(sender); // closes the stream
39//! });
40//!
41//! // Consumer side: any async runtime.
42//! while let Some(event) = stream.next().await {
43//! println!("got {event}");
44//! }
45//! # }
46//! ```
47
48use std::collections::VecDeque;
49use std::fmt;
50use std::future::Future;
51use std::pin::Pin;
52use std::sync::atomic::{AtomicUsize, Ordering};
53use std::sync::{Arc, Condvar, Mutex};
54use std::task::{Context, Poll, Waker};
55
56/// Backing storage shared between the [`BoundedAsyncStream`] consumer and
57/// every [`AsyncStreamSender`] producer.
58struct State<T> {
59 buffer: VecDeque<T>,
60 waker: Option<Waker>,
61 capacity: usize,
62 /// Set to `true` when every sender has been dropped. The consumer's
63 /// `next()` then returns `None` once the buffer drains.
64 closed: bool,
65}
66
67/// Notifies a producer that's blocked in [`AsyncStreamSender::push_or_block`]
68/// that the consumer made room in the buffer.
69struct BackPressure {
70 cvar: Condvar,
71 /// Set to `true` when the stream is dropped — wakes any blocked
72 /// producers so they can bail out instead of waiting forever.
73 consumer_gone: Mutex<bool>,
74 /// Tracks how many live [`AsyncStreamSender`] handles exist.
75 ///
76 /// Using an explicit atomic counter rather than `Arc::strong_count` avoids
77 /// the TOCTOU race where two concurrently-dropped last senders both observe
78 /// a count ≠ 2 and neither marks the stream `closed`, leaving the consumer
79 /// blocked forever. With an atomic decrement the last sender to reach zero
80 /// is identified unambiguously regardless of scheduling interleaving.
81 sender_count: AtomicUsize,
82}
83
84/// A bounded, lossy-by-default, executor-agnostic async stream.
85///
86/// Items are pushed by one or more [`AsyncStreamSender`] handles and pulled
87/// asynchronously via [`BoundedAsyncStream::next`].
88///
89/// See the [module-level docs](crate::stream) for the full design rationale.
90pub struct BoundedAsyncStream<T> {
91 state: Arc<Mutex<State<T>>>,
92 back_pressure: Arc<BackPressure>,
93}
94
95/// Producer handle for a [`BoundedAsyncStream`].
96///
97/// Cheap to clone (`Arc` under the hood). Drop the last `AsyncStreamSender`
98/// to close the stream; the consumer's `next()` will yield `None` once the
99/// buffer is empty.
100pub struct AsyncStreamSender<T> {
101 state: Arc<Mutex<State<T>>>,
102 back_pressure: Arc<BackPressure>,
103}
104
105impl<T> Clone for AsyncStreamSender<T> {
106 fn clone(&self) -> Self {
107 self.back_pressure
108 .sender_count
109 .fetch_add(1, Ordering::Relaxed);
110 Self {
111 state: Arc::clone(&self.state),
112 back_pressure: Arc::clone(&self.back_pressure),
113 }
114 }
115}
116
117impl<T> fmt::Debug for BoundedAsyncStream<T> {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("BoundedAsyncStream")
120 .field("buffered", &self.buffered_count())
121 .field("capacity", &self.capacity())
122 .field("is_closed", &self.is_closed())
123 .finish_non_exhaustive()
124 }
125}
126
127impl<T> fmt::Debug for AsyncStreamSender<T> {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("AsyncStreamSender").finish_non_exhaustive()
130 }
131}
132
133impl<T> BoundedAsyncStream<T> {
134 /// Creates a new bounded stream with the given capacity.
135 ///
136 /// Returns the consumer side and a single producer; clone the sender
137 /// to fan out to multiple producers.
138 ///
139 /// # Panics
140 ///
141 /// Panics if `capacity` is 0 — a zero-capacity buffer would drop every
142 /// item before the consumer could observe it. Use capacity 1 if you
143 /// genuinely want "latest only" semantics.
144 #[must_use]
145 pub fn new(capacity: usize) -> (Self, AsyncStreamSender<T>) {
146 assert!(capacity > 0, "BoundedAsyncStream capacity must be > 0");
147
148 let state = Arc::new(Mutex::new(State {
149 buffer: VecDeque::with_capacity(capacity),
150 waker: None,
151 capacity,
152 closed: false,
153 }));
154 let back_pressure = Arc::new(BackPressure {
155 cvar: Condvar::new(),
156 consumer_gone: Mutex::new(false),
157 sender_count: AtomicUsize::new(1),
158 });
159
160 let stream = Self {
161 state: Arc::clone(&state),
162 back_pressure: Arc::clone(&back_pressure),
163 };
164 let sender = AsyncStreamSender {
165 state,
166 back_pressure,
167 };
168 (stream, sender)
169 }
170
171 /// Returns a future that resolves to the next item, or `None` once the
172 /// stream is closed and drained.
173 #[must_use]
174 pub const fn next(&self) -> NextItem<'_, T> {
175 NextItem { stream: self }
176 }
177
178 /// Non-blocking pop. Returns `None` if the buffer is empty (regardless
179 /// of whether the stream is open or closed).
180 #[must_use]
181 pub fn try_next(&self) -> Option<T> {
182 self.state.lock().ok()?.buffer.pop_front()
183 }
184
185 /// Returns `true` if the stream has been closed (all senders dropped).
186 /// Note: a closed stream may still have buffered items to drain.
187 #[must_use]
188 pub fn is_closed(&self) -> bool {
189 self.state.lock().map_or(true, |s| s.closed)
190 }
191
192 /// Returns the number of items currently buffered (0..=capacity).
193 #[must_use]
194 pub fn buffered_count(&self) -> usize {
195 self.state.lock().map_or(0, |s| s.buffer.len())
196 }
197
198 /// Returns the buffer capacity, as passed to [`Self::new`].
199 #[must_use]
200 pub fn capacity(&self) -> usize {
201 self.state.lock().map_or(0, |s| s.capacity)
202 }
203
204 /// Drops all currently buffered items without closing the stream.
205 pub fn clear_buffer(&self) {
206 if let Ok(mut state) = self.state.lock() {
207 state.buffer.clear();
208 }
209 }
210}
211
212impl<T> Drop for BoundedAsyncStream<T> {
213 fn drop(&mut self) {
214 if let Ok(mut consumer_gone) = self.back_pressure.consumer_gone.lock() {
215 *consumer_gone = true;
216 }
217 self.back_pressure.cvar.notify_all();
218 }
219}
220
221impl<T> AsyncStreamSender<T> {
222 /// Push an item; drops the oldest queued item if the buffer is at
223 /// capacity. This is the lossy default.
224 pub fn push(&self, item: T) {
225 let Ok(mut state) = self.state.lock() else {
226 return;
227 };
228
229 if state.buffer.len() >= state.capacity {
230 state.buffer.pop_front();
231 }
232 state.buffer.push_back(item);
233
234 if let Some(w) = state.waker.take() {
235 w.wake();
236 }
237 }
238
239 /// Push an item, blocking the current thread if the buffer is full
240 /// until the consumer drains an item.
241 ///
242 /// Returns `Err(item)` if the consumer side has been dropped — the
243 /// item is returned to the caller so it isn't leaked.
244 ///
245 /// # Errors
246 ///
247 /// Returns `Err(item)` if the consumer has been dropped.
248 ///
249 /// # Panics
250 ///
251 /// Panics if any mutex is poisoned by another thread panicking while
252 /// holding it.
253 pub fn push_or_block(&self, item: T) -> Result<(), T> {
254 let mut item_slot = Some(item);
255 let Ok(mut state_guard) = self.state.lock() else {
256 return Err(item_slot.take().expect("item present"));
257 };
258
259 loop {
260 // Bail out fast if the consumer is gone.
261 if let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() {
262 if *consumer_gone {
263 return Err(item_slot.take().expect("item present"));
264 }
265 }
266
267 if state_guard.buffer.len() < state_guard.capacity {
268 let item = item_slot.take().expect("item present");
269 state_guard.buffer.push_back(item);
270 if let Some(w) = state_guard.waker.take() {
271 w.wake();
272 }
273 return Ok(());
274 }
275
276 // Buffer full — wait for the consumer to make room. We must
277 // release the state mutex before parking on the cvar, otherwise
278 // the consumer can't push items in (and the consumer-drop signal
279 // can't fire either).
280 drop(state_guard);
281 let Ok(consumer_gone) = self.back_pressure.consumer_gone.lock() else {
282 return Err(item_slot.take().expect("item present"));
283 };
284 let wait_outcome = self.back_pressure.cvar.wait(consumer_gone);
285 drop(wait_outcome);
286
287 // Re-lock state and check again.
288 state_guard = match self.state.lock() {
289 Ok(g) => g,
290 Err(_) => return Err(item_slot.take().expect("item present")),
291 };
292 }
293 }
294
295 /// Returns the number of items currently buffered.
296 #[must_use]
297 pub fn buffered_count(&self) -> usize {
298 self.state.lock().map_or(0, |s| s.buffer.len())
299 }
300
301 /// Returns `true` if the consumer has been dropped.
302 #[must_use]
303 pub fn is_consumer_gone(&self) -> bool {
304 self.back_pressure.consumer_gone.lock().map_or(true, |g| *g)
305 }
306}
307
308impl<T> Drop for AsyncStreamSender<T> {
309 fn drop(&mut self) {
310 // Atomically decrement the sender count. The thread that observes the
311 // count dropping from 1 → 0 is, by definition, the last sender; it is
312 // responsible for marking the stream closed and waking the consumer.
313 //
314 // `AcqRel` ordering: the Release half makes all prior pushes visible to
315 // whoever wins the "count → 0" race; the Acquire half ensures we observe
316 // all prior decrements before deciding we are the last sender.
317 //
318 // This replaces the previous `Arc::strong_count` check, which had a
319 // TOCTOU race: two concurrently-dropped last senders could both observe
320 // strong_count == 3 (A + B + consumer), decide neither was "last", and
321 // leave the consumer blocked forever.
322 let prev = self
323 .back_pressure
324 .sender_count
325 .fetch_sub(1, Ordering::AcqRel);
326 if prev == 1 {
327 // We are the last sender; close the stream and wake the consumer.
328 if let Ok(mut state) = self.state.lock() {
329 state.closed = true;
330 if let Some(w) = state.waker.take() {
331 w.wake();
332 }
333 }
334 }
335 // Wake any push_or_block-blocked clones so they bail out cleanly.
336 self.back_pressure.cvar.notify_all();
337 }
338}
339
340/// Future returned by [`BoundedAsyncStream::next`].
341pub struct NextItem<'a, T> {
342 stream: &'a BoundedAsyncStream<T>,
343}
344
345impl<T> fmt::Debug for NextItem<'_, T> {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("NextItem").finish_non_exhaustive()
348 }
349}
350
351impl<T> Future for NextItem<'_, T> {
352 type Output = Option<T>;
353
354 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355 let Ok(mut state) = self.stream.state.lock() else {
356 return Poll::Ready(None);
357 };
358
359 if let Some(item) = state.buffer.pop_front() {
360 // Notify any push_or_block-blocked producers that there's room.
361 self.stream.back_pressure.cvar.notify_all();
362 return Poll::Ready(Some(item));
363 }
364
365 if state.closed {
366 return Poll::Ready(None);
367 }
368
369 // Avoid the lost-wakeup race: when the executor re-polls with a
370 // different waker (e.g. tokio::select! moves the future between
371 // arms), the previous waker would otherwise remain stored and any
372 // pending push would wake the wrong task. `will_wake` skips the
373 // clone if the executor is reusing the same waker.
374 let waker = cx.waker();
375 match state.waker {
376 Some(ref existing) if existing.will_wake(waker) => {}
377 _ => state.waker = Some(waker.clone()),
378 }
379 Poll::Pending
380 }
381}
382
383#[cfg(feature = "futures-stream")]
384impl<T: 'static> futures_core::Stream for BoundedAsyncStream<T> {
385 type Item = T;
386
387 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
388 let Ok(mut state) = self.state.lock() else {
389 return Poll::Ready(None);
390 };
391
392 if let Some(item) = state.buffer.pop_front() {
393 self.back_pressure.cvar.notify_all();
394 return Poll::Ready(Some(item));
395 }
396
397 if state.closed {
398 return Poll::Ready(None);
399 }
400
401 let waker = cx.waker();
402 match state.waker {
403 Some(ref existing) if existing.will_wake(waker) => {}
404 _ => state.waker = Some(waker.clone()),
405 }
406 Poll::Pending
407 }
408}