ogre_stream_ext/
finalization_callbacks_ext.rs

1//! Adds a new Stream combinator able to call different functions once the Stream ends or is canceled
2//!
3//! Please see [crate::StreamWithFinalizationCallback] for a version that does not distinguish between
4//! completion or cancellation.
5
6use std::{
7    pin::Pin,
8    task::{Context, Poll},
9};
10use std::sync::atomic::{AtomicBool, Ordering};
11use futures::Stream;
12
13/// A Stream wrapper that can call two different closures:
14/// 1) `complete_cb`: fired exactly once when the inner stream returns `None`.
15/// 2) `cancel_cb`: fired exactly once if the wrapper is dropped before seeing `None`.
16///
17/// Internally, we keep each closure as an `Option<…>` so we can `take()` it
18/// and invoke it just one time.
19pub struct StreamWithFinalizationCallbacks<S, FComplete, FCancel>
20where
21    S: Stream,
22    FComplete: FnOnce(),
23    FCancel: FnOnce(),
24{
25    inner: S,
26    complete_fn: Option<FComplete>,
27    cancel_fn: Option<FCancel>,
28    /// Flag: have we already called `complete_cb`? Once `true`, we must not call `cancel_cb`.
29    /// Atomic is used to avoid double-firing when the stream ends gracefully in one thread and
30    /// is immediately dropped by another
31    finalized: AtomicBool,
32}
33
34impl<S, FComplete, FCancel> StreamWithFinalizationCallbacks<S, FComplete, FCancel>
35where
36    S: Stream,
37    FComplete: FnOnce(),
38    FCancel: FnOnce(),
39{
40    /// Construct a new wrapper that:
41    ///  - calls `complete_cb` once when `inner.poll_next()` returns `Ready(None)`, and
42    ///  - calls `cancel_cb` if the wrapper is dropped before seeing `None`.
43    ///
44    /// If you don’t care about cancellations, pass in something like `|| {}` for `cancel_cb`.
45    pub fn new(inner: S, complete_cb: FComplete, cancel_cb: FCancel) -> Self {
46        StreamWithFinalizationCallbacks {
47            inner,
48            complete_fn: Some(complete_cb),
49            cancel_fn: Some(cancel_cb),
50            finalized: AtomicBool::new(false),
51        }
52    }
53}
54
55impl<S, FComplete, FCancel, T> Stream for StreamWithFinalizationCallbacks<S, FComplete, FCancel>
56where
57    S: Stream<Item = T> + Unpin,
58    FComplete: FnOnce(),
59    FCancel: FnOnce(),
60{
61    type Item = T;
62
63    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
64        // SAFETY:
65        //   - We only call `get_unchecked_mut()` because we know:
66        //     1) `StreamWithCallbacks<…>` is structurally pinned (it won’t be moved after pinned),
67        //     2) We never move `inner` or the callback fields out of that pinned memory except by taking them (which is OK),
68        //     3) `inner: S` is `Unpin`, so it’s safe to create a `Pin<&mut S>` from `&mut inner`.
69        //
70        // In other words, after calling `get_unchecked_mut()`, we are free to mutate
71        // the fields through `this`, and then re-pin `inner` via `Pin::new(&mut this.inner)`.
72        let this: &mut Self = unsafe { self.get_unchecked_mut() };
73
74        match Pin::new(&mut this.inner).poll_next(cx) {
75            Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
76
77            Poll::Ready(None) => {
78                // The inner stream is done. If we have not yet called `complete_fn`, do so now.
79                if let Some(complete_fn) = this.complete_fn.take() {
80                    let finalized = this.finalized.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)).unwrap_or_default();
81                    if !finalized {
82                        complete_fn();
83                    }
84                }
85                Poll::Ready(None)
86            }
87
88            Poll::Pending => Poll::Pending,
89        }
90    }
91}
92
93impl<S, FComplete, FCancel> Drop for StreamWithFinalizationCallbacks<S, FComplete, FCancel>
94where
95    S: Stream,
96    FComplete: FnOnce(),
97    FCancel: FnOnce(),
98{
99    fn drop(&mut self) {
100        // If we never reached the “finished” state, that means the user dropped the stream early --
101        // so we call `cancel_fn` if it’s still present.
102        if let Some(cancel_fn) = self.cancel_fn.take() {
103            let finalized = self.finalized.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true)).unwrap_or_default();
104            if !finalized {
105                cancel_fn();
106            }
107        }
108    }
109}
110
111/// A marker “do‐nothing” closure for the opposite callback:
112///  - If the user only registers `.on_complete`, then `cancel_cb` is a no‐op.
113///  - If the user only registers `.on_cancellation`, then `complete_cb` is a no‐op.
114fn no_op() {}
115
116/// The extension trait that adds `.on_complete(...)` and `.on_cancellation(...)` to all `Stream`s.
117///
118/// To use it, do:
119///    use ogre_stream_ext::StreamExtFinalizationCallbacks;
120///    use futures::StreamExt; // if you also want `.map()`, `.filter()`, etc.
121///
122/// Then:
123///    mystream
124///       .map(|x| …)
125///       .on_complete(|| println!("done!"))
126///       .on_cancellation(|| println!("cancelled early!"))
127pub trait StreamExtFinalizationCallbacks: Stream + Sized {
128    fn on_complete<FComplete>(
129        self,
130        complete_cb: FComplete,
131    ) -> StreamWithFinalizationCallbacks<Self, FComplete, impl FnOnce()>
132    where
133        FComplete: FnOnce(),
134    {
135        StreamWithFinalizationCallbacks::new(self, complete_cb, no_op)
136    }
137
138    fn on_cancellation<FCancel>(
139        self,
140        cancel_cb: FCancel,
141    ) -> StreamWithFinalizationCallbacks<Self, impl FnOnce(), FCancel>
142    where
143        FCancel: FnOnce(),
144    {
145        StreamWithFinalizationCallbacks::new(self, no_op, cancel_cb)
146    }
147}
148
149impl<S: Stream> StreamExtFinalizationCallbacks for S {}