stream_unconsume/
lib.rs

1//! Poll stream partially and get emitted items back!
2
3//! Sometimes it is useful to let someone poll you stream but be able to get it back as if it was never polled.
4//! To do so, this crate provides function `remember`, that, given stream, returns tuple of future and stream.
5//! You can move returned stream to any function, losing it, but, when it gets dropped,
6//! future, returned from `remember`, is resolved with new stream that provides all items that was consumed from
7//! original stream, as well as the rest of this stream, so you can use it as if you never poll original stream.
8//!
9//! You may specify any type to be used as buffer, as long as it implements `IntoIterator<Item=Source::Item>` and
10//! `Push<Source::Item>`, where `Source` is the type of stream you want to unconsume.
11//! There is convenience function `remember_vec`, to use `Vec` as buffer backend
12//! (other backends may be `LinkedList`, `VecDeque` and so on).
13//! We use `Push` instead of, say, `PushBack`, to let users ignore order if they wish.
14//! For example, consider if you collect stream to HashSet. If you do that reducing stream,
15//! you will skip repeating elements right when they arrive. However, if you use our `remember_vec` function,
16//! StreamBuffer will keep all these repeating elements, consuming memory,
17//! to let you just drop them when you get your stream back.
18//! Note, that `Push` will add elements using such methods, that, if backend collection preserver insertion order
19//! (`Vec`, but not i.e `HashSet`), iteration over this collection will be in insert order too. In other words,
20//! if you use `LinkedList`, `Push` will perform `push_back`, not `push_front` as you may expect.
21
22//! Note: Rust doesn't guarantee that Drop is ever called, so you may need to use timeout when you await returned future,
23//! otherwise you will wait for it's resolve forever!
24
25use std::{mem::ManuallyDrop, pin::Pin};
26
27use futures::{
28    channel::oneshot::{self, Canceled, Sender},
29    stream::iter,
30    task::{Context, Poll},
31    Future, Stream, StreamExt,
32};
33use push_trait::Push;
34
35// We require Unpin here to be able to move stream out of ManuallyDrop in Drop
36pub struct StreamBuffer<B: Push<S::Item>, S: Stream + Unpin> {
37    // we need to extract these fields in destructor
38    // we use Option only for channel as we need to take it only once,
39    // while stream and buffer are used every poll, so checking Options to be not empty on every unwrap is not cheap
40    inner: ManuallyDrop<S>,
41    buffer: ManuallyDrop<B>,
42    tx: Option<Sender<(S, B)>>,
43}
44
45// S is Unpin, other fields are not pinned
46impl<S: Stream + Unpin, B: Push<S::Item>> Unpin for StreamBuffer<B, S> {}
47
48impl<S: Stream + Unpin, B: Push<S::Item> + Default> StreamBuffer<B, S> {
49    fn new(source: S, tx: Sender<(S, B)>) -> Self {
50        StreamBuffer {
51            inner: ManuallyDrop::new(source),
52            buffer: ManuallyDrop::new(B::default()),
53            tx: Some(tx),
54        }
55    }
56}
57
58impl<S: Stream + Unpin, B: Push<S::Item>> Stream for StreamBuffer<B, S>
59where
60    S::Item: Clone,
61{
62    type Item = S::Item;
63
64    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        let next = Pin::new(&mut *self.inner).poll_next(cx);
66        match &next {
67            Poll::Ready(Some(item)) => {
68                self.buffer.push((*item).clone());
69                next
70            }
71            _ => next,
72        }
73    }
74}
75
76// SAFETY: StreamBuffer<S> is Unpin so we may use self: Self, forgetting the fact that self is ever pinned
77impl<S: Stream + Unpin, B: Push<S::Item>> Drop for StreamBuffer<B, S> {
78    fn drop(&mut self) {
79        let tx = self.tx.take().expect("Sender is gone");
80        // SAFETY: we don't use inner nor buffer after this line, it is not touched by Drop too
81        // ignore error as we don't care if receiver no more interested in stream and buffer
82        let _ = tx.send((
83            // SAFETY: We required S to be Unpin, so here we can move it out of ManuallyDrop
84            unsafe { ManuallyDrop::take(&mut self.inner) },
85            // SAFETY: We don't need S::Item to be Unpin because we never pin them,
86            // and buffer can be moved out of ManuallyDrop because we never pin it
87            unsafe { ManuallyDrop::take(&mut self.buffer) },
88        ));
89        // we don't call ManuallyDrop on fields as they are moved to channel
90    }
91}
92
93/// Returns stream that remembers all produced items
94/// And resolves returned future with stream that behaves like original stream was never polled
95/// In other words, it lets partially consume stream and get all consumed items back
96pub fn remember<B: Push<S::Item> + IntoIterator<Item = S::Item> + Default, S: Stream + Unpin>(
97    source: S,
98) -> (
99    impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
100    impl Stream<Item = S::Item>,
101)
102where
103    S::Item: Clone,
104{
105    let (tx, rx) = oneshot::channel();
106    let fut = async {
107        let (tail, buffer) = rx.await?;
108        Ok(iter(buffer).chain(tail))
109    };
110    // fuse source stream to be able to poll it after finish (when we `chain` it with buffer)
111    (fut, StreamBuffer::<B, _>::new(source.fuse(), tx))
112}
113
114/// Convenience function to use `Vec` as buffer
115pub fn remember_vec<S: Stream + Unpin>(
116    source: S,
117) -> (
118    impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
119    impl Stream<Item = S::Item>,
120)
121where
122    S::Item: Clone,
123{
124    remember::<Vec<_>, _>(source)
125}
126
127#[cfg(test)]
128mod tests {
129    use super::{remember, remember_vec};
130    use futures::{channel::oneshot::Canceled, executor::block_on, future::ready, StreamExt};
131
132    fn check_lifetime_is_propagated_right() {
133        fn assert_static<T: 'static>(_: &T) {}
134        let x = vec![String::new(); 3];
135        let source = futures::stream::iter(x.iter());
136        // assert_static(&source);
137        let (_buffer, _buffer_stream) = remember::<Vec<_>, _>(source);
138        // assert_static(&buffer_stream);
139    }
140
141    #[test]
142    fn test_consumed_values_are_present() {
143        let x = vec![1, 2, 3];
144        let source = futures::stream::iter(x.clone().into_iter());
145        let (buffer, buffer_stream) = remember::<Vec<_>, _>(source);
146        let res = block_on(async {
147            // consume first two items
148            buffer_stream.take(2).for_each(|_| ready(())).await;
149            let stream = buffer.await?;
150            // first two items are still present after stream comes back
151            assert_eq!(stream.collect::<Vec<_>>().await, x);
152            Ok::<_, Canceled>(())
153        });
154        assert!(res.is_ok());
155    }
156
157    #[test]
158    fn test_consumed_stream_becomes_empty_tail_and_dont_panic() {
159        struct UnfusedIter<I> {
160            finished: bool,
161            inner: I,
162        }
163        impl<I> UnfusedIter<I> {
164            fn new(inner: I) -> UnfusedIter<I> {
165                Self {
166                    inner,
167                    finished: false,
168                }
169            }
170        }
171
172        impl<I: Iterator> Iterator for UnfusedIter<I> {
173            type Item = I::Item;
174
175            fn next(&mut self) -> Option<Self::Item> {
176                if self.finished {
177                    panic!("Iterating over finished iterator");
178                } else {
179                    let next = self.inner.next();
180                    if next.is_none() {
181                        self.finished = true;
182                    }
183                    next
184                }
185            }
186        }
187        let x = vec![1, 2, 3];
188        // Here we want to emulate stream that panics on poll after finish
189        let source = futures::stream::iter(UnfusedIter::new(x.clone().into_iter()));
190        let (buffer, buffer_stream) = remember_vec(source);
191        let res = block_on(async {
192            // consume whole stream
193            buffer_stream.for_each(|_| ready(())).await;
194            let stream = buffer.await?;
195            // consumed stream doesn't panic on consume after finish
196            assert_eq!(stream.collect::<Vec<_>>().await, x);
197            Ok::<_, Canceled>(())
198        });
199        assert!(res.is_ok());
200    }
201}