stream_guard/
lib.rs

1//! A small RAII wrapper around a [`Stream`] that automatically invokes a
2//! user-defined action upon being dropped.
3//! 
4//! For example:
5//! 
6//! ```rust
7//! # use futures::stream::{self, StreamExt};
8//! # use stream_guard::GuardStreamExt;
9//! #
10//! async fn f() {
11//!     let mut s = stream::iter(0..3).guard(|| println!("Dropped!"));
12//!     while let Some(i) = s.next().await {
13//!         println!("{}", i);
14//!     }
15//! }
16//! ```
17//! 
18//! would print
19//! 
20//! ```plaintext
21//! 0
22//! 1
23//! 2
24//! Dropped!
25//! ```
26
27use std::{pin::Pin, task::{Context, Poll}};
28
29use futures::Stream;
30use pin_project::{pin_project, pinned_drop};
31
32/// A [`Stream`] wrapper that automatically runs a custom action when dropped.
33#[pin_project(PinnedDrop)]
34pub struct StreamGuard<S, F> where S: Stream, F: FnOnce() {
35    #[pin]
36    stream: S,
37    on_drop: Option<F>,
38}
39
40impl<S, F> StreamGuard<S, F> where S: Stream, F: FnOnce() {
41    /// Wraps the given [`Stream`], running the given closure upon being dropped.
42    pub fn new(stream: S, on_drop: F) -> Self {
43        Self { stream, on_drop: Some(on_drop) }
44    }
45}
46
47impl<S, F> Stream for StreamGuard<S, F> where S: Stream, F: FnOnce() {
48    type Item = S::Item;
49
50    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        self.project().stream.poll_next(cx)
52    }
53
54    fn size_hint(&self) -> (usize, Option<usize>) {
55        self.stream.size_hint()
56    }
57}
58
59#[pinned_drop]
60impl<S, F> PinnedDrop for StreamGuard<S, F> where S: Stream, F: FnOnce() {
61    fn drop(mut self: Pin<&mut Self>) {
62        self.project().on_drop.take().expect("No on_drop function in StreamGuard, was drop called twice or constructed wrongly?")()
63    }
64}
65
66/// A convenience extension for creating a [`StreamGuard`] via a method.
67pub trait GuardStreamExt: Stream + Sized {
68    /// Wraps the [`Stream`], running the given closure upon being dropped.
69    fn guard<F>(self, on_drop: F) -> StreamGuard<Self, F> where F: FnOnce();
70}
71
72impl<S> GuardStreamExt for S where S: Stream + Sized {
73    fn guard<F>(self, on_drop: F) -> StreamGuard<Self, F> where F: FnOnce() {
74        StreamGuard::new(self, on_drop)
75    }
76}