stream_unconsume/lib.rs
1//! Poll stream partially and get emitted items back!
2
3//! Sometimes it is useful to let someone poll you stream but be able to get it back as if it was never polled.
4//! To do so, this crate provides function `remember`, that, given stream, returns tuple of future and stream.
5//! You can move returned stream to any function, losing it, but, when it gets dropped,
6//! future, returned from `remember`, is resolved with new stream that provides all items that was consumed from
7//! original stream, as well as the rest of this stream, so you can use it as if you never poll original stream.
8//!
9//! You may specify any type to be used as buffer, as long as it implements `IntoIterator<Item=Source::Item>` and
10//! `Push<Source::Item>`, where `Source` is the type of stream you want to unconsume.
11//! There is convenience function `remember_vec`, to use `Vec` as buffer backend
12//! (other backends may be `LinkedList`, `VecDeque` and so on).
13//! We use `Push` instead of, say, `PushBack`, to let users ignore order if they wish.
14//! For example, consider if you collect stream to HashSet. If you do that reducing stream,
15//! you will skip repeating elements right when they arrive. However, if you use our `remember_vec` function,
16//! StreamBuffer will keep all these repeating elements, consuming memory,
17//! to let you just drop them when you get your stream back.
18//! Note, that `Push` will add elements using such methods, that, if backend collection preserver insertion order
19//! (`Vec`, but not i.e `HashSet`), iteration over this collection will be in insert order too. In other words,
20//! if you use `LinkedList`, `Push` will perform `push_back`, not `push_front` as you may expect.
21
22//! Note: Rust doesn't guarantee that Drop is ever called, so you may need to use timeout when you await returned future,
23//! otherwise you will wait for it's resolve forever!
24
25use std::{mem::ManuallyDrop, pin::Pin};
26
27use futures::{
28 channel::oneshot::{self, Canceled, Sender},
29 stream::iter,
30 task::{Context, Poll},
31 Future, Stream, StreamExt,
32};
33use push_trait::Push;
34
35// We require Unpin here to be able to move stream out of ManuallyDrop in Drop
36pub struct StreamBuffer<B: Push<S::Item>, S: Stream + Unpin> {
37 // we need to extract these fields in destructor
38 // we use Option only for channel as we need to take it only once,
39 // while stream and buffer are used every poll, so checking Options to be not empty on every unwrap is not cheap
40 inner: ManuallyDrop<S>,
41 buffer: ManuallyDrop<B>,
42 tx: Option<Sender<(S, B)>>,
43}
44
45// S is Unpin, other fields are not pinned
46impl<S: Stream + Unpin, B: Push<S::Item>> Unpin for StreamBuffer<B, S> {}
47
48impl<S: Stream + Unpin, B: Push<S::Item> + Default> StreamBuffer<B, S> {
49 fn new(source: S, tx: Sender<(S, B)>) -> Self {
50 StreamBuffer {
51 inner: ManuallyDrop::new(source),
52 buffer: ManuallyDrop::new(B::default()),
53 tx: Some(tx),
54 }
55 }
56}
57
58impl<S: Stream + Unpin, B: Push<S::Item>> Stream for StreamBuffer<B, S>
59where
60 S::Item: Clone,
61{
62 type Item = S::Item;
63
64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let next = Pin::new(&mut *self.inner).poll_next(cx);
66 match &next {
67 Poll::Ready(Some(item)) => {
68 self.buffer.push((*item).clone());
69 next
70 }
71 _ => next,
72 }
73 }
74}
75
76// SAFETY: StreamBuffer<S> is Unpin so we may use self: Self, forgetting the fact that self is ever pinned
77impl<S: Stream + Unpin, B: Push<S::Item>> Drop for StreamBuffer<B, S> {
78 fn drop(&mut self) {
79 let tx = self.tx.take().expect("Sender is gone");
80 // SAFETY: we don't use inner nor buffer after this line, it is not touched by Drop too
81 // ignore error as we don't care if receiver no more interested in stream and buffer
82 let _ = tx.send((
83 // SAFETY: We required S to be Unpin, so here we can move it out of ManuallyDrop
84 unsafe { ManuallyDrop::take(&mut self.inner) },
85 // SAFETY: We don't need S::Item to be Unpin because we never pin them,
86 // and buffer can be moved out of ManuallyDrop because we never pin it
87 unsafe { ManuallyDrop::take(&mut self.buffer) },
88 ));
89 // we don't call ManuallyDrop on fields as they are moved to channel
90 }
91}
92
93/// Returns stream that remembers all produced items
94/// And resolves returned future with stream that behaves like original stream was never polled
95/// In other words, it lets partially consume stream and get all consumed items back
96pub fn remember<B: Push<S::Item> + IntoIterator<Item = S::Item> + Default, S: Stream + Unpin>(
97 source: S,
98) -> (
99 impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
100 impl Stream<Item = S::Item>,
101)
102where
103 S::Item: Clone,
104{
105 let (tx, rx) = oneshot::channel();
106 let fut = async {
107 let (tail, buffer) = rx.await?;
108 Ok(iter(buffer).chain(tail))
109 };
110 // fuse source stream to be able to poll it after finish (when we `chain` it with buffer)
111 (fut, StreamBuffer::<B, _>::new(source.fuse(), tx))
112}
113
114/// Convenience function to use `Vec` as buffer
115pub fn remember_vec<S: Stream + Unpin>(
116 source: S,
117) -> (
118 impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
119 impl Stream<Item = S::Item>,
120)
121where
122 S::Item: Clone,
123{
124 remember::<Vec<_>, _>(source)
125}
126
127#[cfg(test)]
128mod tests {
129 use super::{remember, remember_vec};
130 use futures::{channel::oneshot::Canceled, executor::block_on, future::ready, StreamExt};
131
132 fn check_lifetime_is_propagated_right() {
133 fn assert_static<T: 'static>(_: &T) {}
134 let x = vec![String::new(); 3];
135 let source = futures::stream::iter(x.iter());
136 // assert_static(&source);
137 let (_buffer, _buffer_stream) = remember::<Vec<_>, _>(source);
138 // assert_static(&buffer_stream);
139 }
140
141 #[test]
142 fn test_consumed_values_are_present() {
143 let x = vec![1, 2, 3];
144 let source = futures::stream::iter(x.clone().into_iter());
145 let (buffer, buffer_stream) = remember::<Vec<_>, _>(source);
146 let res = block_on(async {
147 // consume first two items
148 buffer_stream.take(2).for_each(|_| ready(())).await;
149 let stream = buffer.await?;
150 // first two items are still present after stream comes back
151 assert_eq!(stream.collect::<Vec<_>>().await, x);
152 Ok::<_, Canceled>(())
153 });
154 assert!(res.is_ok());
155 }
156
157 #[test]
158 fn test_consumed_stream_becomes_empty_tail_and_dont_panic() {
159 struct UnfusedIter<I> {
160 finished: bool,
161 inner: I,
162 }
163 impl<I> UnfusedIter<I> {
164 fn new(inner: I) -> UnfusedIter<I> {
165 Self {
166 inner,
167 finished: false,
168 }
169 }
170 }
171
172 impl<I: Iterator> Iterator for UnfusedIter<I> {
173 type Item = I::Item;
174
175 fn next(&mut self) -> Option<Self::Item> {
176 if self.finished {
177 panic!("Iterating over finished iterator");
178 } else {
179 let next = self.inner.next();
180 if next.is_none() {
181 self.finished = true;
182 }
183 next
184 }
185 }
186 }
187 let x = vec![1, 2, 3];
188 // Here we want to emulate stream that panics on poll after finish
189 let source = futures::stream::iter(UnfusedIter::new(x.clone().into_iter()));
190 let (buffer, buffer_stream) = remember_vec(source);
191 let res = block_on(async {
192 // consume whole stream
193 buffer_stream.for_each(|_| ready(())).await;
194 let stream = buffer.await?;
195 // consumed stream doesn't panic on consume after finish
196 assert_eq!(stream.collect::<Vec<_>>().await, x);
197 Ok::<_, Canceled>(())
198 });
199 assert!(res.is_ok());
200 }
201}