1use std::{pin::Pin, task::{Context, Poll}};
28
29use futures::Stream;
30use pin_project::{pin_project, pinned_drop};
31
32#[pin_project(PinnedDrop)]
34pub struct StreamGuard<S, F> where S: Stream, F: FnOnce() {
35 #[pin]
36 stream: S,
37 on_drop: Option<F>,
38}
39
40impl<S, F> StreamGuard<S, F> where S: Stream, F: FnOnce() {
41 pub fn new(stream: S, on_drop: F) -> Self {
43 Self { stream, on_drop: Some(on_drop) }
44 }
45}
46
47impl<S, F> Stream for StreamGuard<S, F> where S: Stream, F: FnOnce() {
48 type Item = S::Item;
49
50 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 self.project().stream.poll_next(cx)
52 }
53
54 fn size_hint(&self) -> (usize, Option<usize>) {
55 self.stream.size_hint()
56 }
57}
58
59#[pinned_drop]
60impl<S, F> PinnedDrop for StreamGuard<S, F> where S: Stream, F: FnOnce() {
61 fn drop(mut self: Pin<&mut Self>) {
62 self.project().on_drop.take().expect("No on_drop function in StreamGuard, was drop called twice or constructed wrongly?")()
63 }
64}
65
66pub trait GuardStreamExt: Stream + Sized {
68 fn guard<F>(self, on_drop: F) -> StreamGuard<Self, F> where F: FnOnce();
70}
71
72impl<S> GuardStreamExt for S where S: Stream + Sized {
73 fn guard<F>(self, on_drop: F) -> StreamGuard<Self, F> where F: FnOnce() {
74 StreamGuard::new(self, on_drop)
75 }
76}