ogre_stream_ext/finalization_callbacks_ext.rs
1//! Adds a new Stream combinator able to call different functions once the Stream ends or is canceled
2//!
3//! Please see [crate::StreamWithFinalizationCallback] for a version that does not distinguish between
4//! completion or cancellation.
5
6use std::{
7 pin::Pin,
8 task::{Context, Poll},
9};
10use std::sync::atomic::{AtomicBool, Ordering};
11use futures::Stream;
12
13/// A Stream wrapper that can call two different closures:
14/// 1) `complete_cb`: fired exactly once when the inner stream returns `None`.
15/// 2) `cancel_cb`: fired exactly once if the wrapper is dropped before seeing `None`.
16///
17/// Internally, we keep each closure as an `Option<…>` so we can `take()` it
18/// and invoke it just one time.
19pub struct StreamWithFinalizationCallbacks<S, FComplete, FCancel>
20where
21 S: Stream,
22 FComplete: FnOnce(),
23 FCancel: FnOnce(),
24{
25 inner: S,
26 complete_fn: Option<FComplete>,
27 cancel_fn: Option<FCancel>,
28 /// Flag: have we already called `complete_cb`? Once `true`, we must not call `cancel_cb`.
29 /// Atomic is used to avoid double-firing when the stream ends gracefully in one thread and
30 /// is immediately dropped by another
31 finalized: AtomicBool,
32}
33
34impl<S, FComplete, FCancel> StreamWithFinalizationCallbacks<S, FComplete, FCancel>
35where
36 S: Stream,
37 FComplete: FnOnce(),
38 FCancel: FnOnce(),
39{
40 /// Construct a new wrapper that:
41 /// - calls `complete_cb` once when `inner.poll_next()` returns `Ready(None)`, and
42 /// - calls `cancel_cb` if the wrapper is dropped before seeing `None`.
43 ///
44 /// If you don’t care about cancellations, pass in something like `|| {}` for `cancel_cb`.
45 pub fn new(inner: S, complete_cb: FComplete, cancel_cb: FCancel) -> Self {
46 StreamWithFinalizationCallbacks {
47 inner,
48 complete_fn: Some(complete_cb),
49 cancel_fn: Some(cancel_cb),
50 finalized: AtomicBool::new(false),
51 }
52 }
53}
54
55impl<S, FComplete, FCancel, T> Stream for StreamWithFinalizationCallbacks<S, FComplete, FCancel>
56where
57 S: Stream<Item = T> + Unpin,
58 FComplete: FnOnce(),
59 FCancel: FnOnce(),
60{
61 type Item = T;
62
63 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
64 // SAFETY:
65 // - We only call `get_unchecked_mut()` because we know:
66 // 1) `StreamWithCallbacks<…>` is structurally pinned (it won’t be moved after pinned),
67 // 2) We never move `inner` or the callback fields out of that pinned memory except by taking them (which is OK),
68 // 3) `inner: S` is `Unpin`, so it’s safe to create a `Pin<&mut S>` from `&mut inner`.
69 //
70 // In other words, after calling `get_unchecked_mut()`, we are free to mutate
71 // the fields through `this`, and then re-pin `inner` via `Pin::new(&mut this.inner)`.
72 let this: &mut Self = unsafe { self.get_unchecked_mut() };
73
74 match Pin::new(&mut this.inner).poll_next(cx) {
75 Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
76
77 Poll::Ready(None) => {
78 // The inner stream is done. If we have not yet called `complete_fn`, do so now.
79 if let Some(complete_fn) = this.complete_fn.take() {
80 let finalized = this.finalized.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)).unwrap_or_default();
81 if !finalized {
82 complete_fn();
83 }
84 }
85 Poll::Ready(None)
86 }
87
88 Poll::Pending => Poll::Pending,
89 }
90 }
91}
92
93impl<S, FComplete, FCancel> Drop for StreamWithFinalizationCallbacks<S, FComplete, FCancel>
94where
95 S: Stream,
96 FComplete: FnOnce(),
97 FCancel: FnOnce(),
98{
99 fn drop(&mut self) {
100 // If we never reached the “finished” state, that means the user dropped the stream early --
101 // so we call `cancel_fn` if it’s still present.
102 if let Some(cancel_fn) = self.cancel_fn.take() {
103 let finalized = self.finalized.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)).unwrap_or_default();
104 if !finalized {
105 cancel_fn();
106 }
107 }
108 }
109}
110
111/// A marker “do‐nothing” closure for the opposite callback:
112/// - If the user only registers `.on_complete`, then `cancel_cb` is a no‐op.
113/// - If the user only registers `.on_cancellation`, then `complete_cb` is a no‐op.
114fn no_op() {}
115
116/// The extension trait that adds `.on_complete(...)` and `.on_cancellation(...)` to all `Stream`s.
117///
118/// To use it, do:
119/// use ogre_stream_ext::StreamExtFinalizationCallbacks;
120/// use futures::StreamExt; // if you also want `.map()`, `.filter()`, etc.
121///
122/// Then:
123/// mystream
124/// .map(|x| …)
125/// .on_complete(|| println!("done!"))
126/// .on_cancellation(|| println!("cancelled early!"))
127pub trait StreamExtFinalizationCallbacks: Stream + Sized {
128 fn on_complete<FComplete>(
129 self,
130 complete_cb: FComplete,
131 ) -> StreamWithFinalizationCallbacks<Self, FComplete, impl FnOnce()>
132 where
133 FComplete: FnOnce(),
134 {
135 StreamWithFinalizationCallbacks::new(self, complete_cb, no_op)
136 }
137
138 fn on_cancellation<FCancel>(
139 self,
140 cancel_cb: FCancel,
141 ) -> StreamWithFinalizationCallbacks<Self, impl FnOnce(), FCancel>
142 where
143 FCancel: FnOnce(),
144 {
145 StreamWithFinalizationCallbacks::new(self, no_op, cancel_cb)
146 }
147}
148
149impl<S: Stream> StreamExtFinalizationCallbacks for S {}