1use futures_core::Stream;
2use pin_project::{pin_project, pinned_drop};
3use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8#[pin_project(PinnedDrop)]
34pub struct DropStream<S: Stream<Item = T>, T, U: FnOnce()> {
35 #[pin]
36 stream: S,
37 dropper: Option<U>,
39}
40
41impl<S: Stream<Item = T>, T, U: FnOnce()> DropStream<S, T, U> {
42 pub fn new(stream: S, dropper: U) -> Self {
43 Self {
44 stream,
45 dropper: Some(dropper),
46 }
47 }
48}
49
50impl<S: Stream<Item = T>, T, U: FnOnce()> Stream for DropStream<S, T, U> {
51 type Item = T;
52
53 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 let stream = self.project().stream;
55 stream.poll_next(cx)
56 }
57}
58
59#[pinned_drop]
60impl<S: Stream<Item = T>, T, U: FnOnce()> PinnedDrop for DropStream<S, T, U> {
61 fn drop(self: Pin<&mut Self>) {
62 let Some(dropper) = self.project().dropper.take() else {
63 unreachable!()
65 };
66
67 dropper()
68 }
69}
70
71pub trait DropStreamExt<U: FnOnce()>: Stream + Sized {
72 fn on_drop(self, dropper: U) -> DropStream<Self, Self::Item, U>;
96}
97
98impl<T, U: FnOnce()> DropStreamExt<U> for T
99where
100 T: Stream + Sized,
101{
102 fn on_drop(self, dropper: U) -> DropStream<T, T::Item, U> {
103 DropStream::new(self, dropper)
104 }
105}
106
107#[cfg(test)]
108mod tests {
109 use std::task::Poll;
110
111 use crate::{DropStream, DropStreamExt};
112 use futures::{stream::repeat, Stream};
113
114 #[test]
115 fn dropper_runs_on_drop() {
116 let test_stream = repeat(true);
117
118 let mut has_run = false;
119
120 {
121 let has_run_ref = &mut has_run;
122 let _drop_stream = DropStream::new(test_stream, move || {
123 *has_run_ref = true;
124 });
125 }
126
127 assert!(has_run)
128 }
129
130 #[test]
131 fn stream_passes_through_result() {
132 let test_stream = repeat(true);
133
134 let drop_stream = DropStream::new(test_stream, || {});
135
136 let mut drop_stream = Box::pin(drop_stream);
137
138 let waker = futures::task::noop_waker();
139 let mut context = futures::task::Context::from_waker(&waker);
140 assert_eq!(
141 drop_stream.as_mut().poll_next(&mut context),
142 Poll::Ready(Some(true))
143 );
144 }
145
146 #[test]
147 fn dropper_runs_on_drop_after_passing_result() {
148 let test_stream = repeat(true);
149
150 let mut has_run = false;
151
152 {
153 let has_run_ref = &mut has_run;
154 let drop_stream = DropStream::new(test_stream, move || {
155 *has_run_ref = true;
156 });
157
158 let mut drop_stream = Box::pin(drop_stream);
159
160 let waker = futures::task::noop_waker();
161 let mut context = futures::task::Context::from_waker(&waker);
162 assert_eq!(
163 drop_stream.as_mut().poll_next(&mut context),
164 Poll::Ready(Some(true))
165 );
166 }
167
168 assert!(has_run)
169 }
170
171 #[test]
172 fn stream_trait_is_implemented() {
173 let test_stream = repeat(true);
174
175 let mut has_run = false;
176
177 {
178 let has_run_ref = &mut has_run;
179 let drop_stream = test_stream.on_drop(move || {
180 *has_run_ref = true;
181 });
182
183 let mut drop_stream = Box::pin(drop_stream);
184
185 let waker = futures::task::noop_waker();
186 let mut context = futures::task::Context::from_waker(&waker);
187 assert_eq!(
188 drop_stream.as_mut().poll_next(&mut context),
189 Poll::Ready(Some(true))
190 );
191 }
192
193 assert!(has_run)
194 }
195}