drop_stream/
lib.rs

1use futures_core::Stream;
2use pin_project::{pin_project, pinned_drop};
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8/// A stream that wraps another stream with a closure that is called once it is dropped.
9/// Very useful for libraries that use streams for data transfer and you need to connect
10/// when the opposite site drops the connection, thus dropping the stream.
11///
12/// Example
13/// ```
14/// use futures::Stream;
15/// use drop_stream::DropStream;
16///
17/// let test_stream = futures::stream::repeat(true);
18/// {
19///     let wrapped_stream = DropStream::new(test_stream, move || {
20///         println!("Stream has been dropped!");
21///     });
22///
23///     let mut wrapped_stream = Box::pin(wrapped_stream);
24///
25///     let waker = futures::task::noop_waker();
26///     let mut context = futures::task::Context::from_waker(&waker);
27///     assert_eq!(
28///         wrapped_stream.as_mut().poll_next(&mut context),
29///         std::task::Poll::Ready(Some(true))
30///     );
31/// }
32/// ```
33#[pin_project(PinnedDrop)]
34pub struct DropStream<S: Stream<Item = T>, T, U: FnOnce()> {
35    #[pin]
36    stream: S,
37    // Option used to wrap FnOnce since ownership of FnOnce needs to be gained in the Drop::drop() method.
38    dropper: Option<U>,
39}
40
41impl<S: Stream<Item = T>, T, U: FnOnce()> DropStream<S, T, U> {
42    pub fn new(stream: S, dropper: U) -> Self {
43        Self {
44            stream,
45            dropper: Some(dropper),
46        }
47    }
48}
49
50impl<S: Stream<Item = T>, T, U: FnOnce()> Stream for DropStream<S, T, U> {
51    type Item = T;
52
53    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        let stream = self.project().stream;
55        stream.poll_next(cx)
56    }
57}
58
59#[pinned_drop]
60impl<S: Stream<Item = T>, T, U: FnOnce()> PinnedDrop for DropStream<S, T, U> {
61    fn drop(self: Pin<&mut Self>) {
62        let Some(dropper) = self.project().dropper.take() else {
63            // Only taken in the "drop"-method, and always set in the constructor. Thus it cannot be None here.
64            unreachable!()
65        };
66
67        dropper()
68    }
69}
70
71pub trait DropStreamExt<U: FnOnce()>: Stream + Sized {
72    /// Wraps the stream with a closure that is called once it is dropped.
73    /// ex:
74    /// ```rust
75    /// use std::task::Poll;
76    /// use futures::{stream::repeat, Stream};
77    /// use drop_stream::DropStreamExt;
78    ///
79    /// let some_stream = repeat(true);
80    ///
81    /// let mut has_run = false;
82    /// let has_run_ref = &mut has_run;
83    /// let drop_stream = some_stream.on_drop(move || {
84    ///     *has_run_ref = true;
85    ///     println!("Stream has been dropped!")
86    /// });
87    ///
88    /// let mut drop_stream = Box::pin(drop_stream);
89    ///
90    /// // Some stream work and polling...
91    ///
92    /// drop(drop_stream); // Runs the closure
93    /// assert!(has_run);
94    /// ```
95    fn on_drop(self, dropper: U) -> DropStream<Self, Self::Item, U>;
96}
97
98impl<T, U: FnOnce()> DropStreamExt<U> for T
99where
100    T: Stream + Sized,
101{
102    fn on_drop(self, dropper: U) -> DropStream<T, T::Item, U> {
103        DropStream::new(self, dropper)
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use std::task::Poll;
110
111    use crate::{DropStream, DropStreamExt};
112    use futures::{stream::repeat, Stream};
113
114    #[test]
115    fn dropper_runs_on_drop() {
116        let test_stream = repeat(true);
117
118        let mut has_run = false;
119
120        {
121            let has_run_ref = &mut has_run;
122            let _drop_stream = DropStream::new(test_stream, move || {
123                *has_run_ref = true;
124            });
125        }
126
127        assert!(has_run)
128    }
129
130    #[test]
131    fn stream_passes_through_result() {
132        let test_stream = repeat(true);
133
134        let drop_stream = DropStream::new(test_stream, || {});
135
136        let mut drop_stream = Box::pin(drop_stream);
137
138        let waker = futures::task::noop_waker();
139        let mut context = futures::task::Context::from_waker(&waker);
140        assert_eq!(
141            drop_stream.as_mut().poll_next(&mut context),
142            Poll::Ready(Some(true))
143        );
144    }
145
146    #[test]
147    fn dropper_runs_on_drop_after_passing_result() {
148        let test_stream = repeat(true);
149
150        let mut has_run = false;
151
152        {
153            let has_run_ref = &mut has_run;
154            let drop_stream = DropStream::new(test_stream, move || {
155                *has_run_ref = true;
156            });
157
158            let mut drop_stream = Box::pin(drop_stream);
159
160            let waker = futures::task::noop_waker();
161            let mut context = futures::task::Context::from_waker(&waker);
162            assert_eq!(
163                drop_stream.as_mut().poll_next(&mut context),
164                Poll::Ready(Some(true))
165            );
166        }
167
168        assert!(has_run)
169    }
170
171    #[test]
172    fn stream_trait_is_implemented() {
173        let test_stream = repeat(true);
174
175        let mut has_run = false;
176
177        {
178            let has_run_ref = &mut has_run;
179            let drop_stream = test_stream.on_drop(move || {
180                *has_run_ref = true;
181            });
182
183            let mut drop_stream = Box::pin(drop_stream);
184
185            let waker = futures::task::noop_waker();
186            let mut context = futures::task::Context::from_waker(&waker);
187            assert_eq!(
188                drop_stream.as_mut().poll_next(&mut context),
189                Poll::Ready(Some(true))
190            );
191        }
192
193        assert!(has_run)
194    }
195}