1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//! Poll stream partially and get emitted items back!

//! Sometimes it is useful to let someone poll you stream but be able to get it back as if it was never polled.
//! To do so, this crate provides function `remember`, that, given stream, returns tuple of future and stream.
//! You can move returned stream to any function, losing it, but, when it gets dropped,
//! future, returned from `remember`, is resolved with new stream that provides all items that was consumed from
//! original stream, as well as the rest of this stream, so you can use it as if you never poll original stream.
//!
//! You may specify any type to be used as buffer, as long as it implements `IntoIterator<Item=Source::Item>` and
//! `Push<Source::Item>`, where `Source` is the type of stream you want to unconsume.
//! There is convenience function `remember_vec`, to use `Vec` as buffer backend
//! (other backends may be `LinkedList`, `VecDeque` and so on).
//! We use `Push` instead of, say, `PushBack`, to let users ignore order if they wish.
//! For example, consider if you collect stream to HashSet. If you do that reducing stream,
//! you will skip repeating elements right when they arrive. However, if you use our `remember_vec` function,
//! StreamBuffer will keep all these repeating elements, consuming memory,
//! to let you just drop them when you get your stream back.
//! Note, that `Push` will add elements using such methods, that, if backend collection preserver insertion order
//! (`Vec`, but not i.e `HashSet`), iteration over this collection will be in insert order too. In other words,
//! if you use `LinkedList`, `Push` will perform `push_back`, not `push_front` as you may expect.

//! Note: Rust doesn't guarantee that Drop is ever called, so you may need to use timeout when you await returned future,
//! otherwise you will wait for it's resolve forever!

use std::{mem::ManuallyDrop, pin::Pin};

use futures::{
    channel::oneshot::{self, Canceled, Sender},
    stream::iter,
    task::{Context, Poll},
    Future, Stream, StreamExt,
};
use push_trait::Push;

// We require Unpin here to be able to move stream out of ManuallyDrop in Drop
pub struct StreamBuffer<B: Push<S::Item>, S: Stream + Unpin> {
    // we need to extract these fields in destructor
    // we use Option only for channel as we need to take it only once,
    // while stream and buffer are used every poll, so checking Options to be not empty on every unwrap is not cheap
    inner: ManuallyDrop<S>,
    buffer: ManuallyDrop<B>,
    tx: Option<Sender<(S, B)>>,
}

// S is Unpin, other fields are not pinned
impl<S: Stream + Unpin, B: Push<S::Item>> Unpin for StreamBuffer<B, S> {}

impl<S: Stream + Unpin, B: Push<S::Item> + Default> StreamBuffer<B, S> {
    fn new(source: S, tx: Sender<(S, B)>) -> Self {
        StreamBuffer {
            inner: ManuallyDrop::new(source),
            buffer: ManuallyDrop::new(B::default()),
            tx: Some(tx),
        }
    }
}

impl<S: Stream + Unpin, B: Push<S::Item>> Stream for StreamBuffer<B, S>
where
    S::Item: Clone,
{
    type Item = S::Item;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let next = Pin::new(&mut *self.inner).poll_next(cx);
        match &next {
            Poll::Ready(Some(item)) => {
                self.buffer.push((*item).clone());
                next
            }
            _ => next,
        }
    }
}

// SAFETY: StreamBuffer<S> is Unpin so we may use self: Self, forgetting the fact that self is ever pinned
impl<S: Stream + Unpin, B: Push<S::Item>> Drop for StreamBuffer<B, S> {
    fn drop(&mut self) {
        let tx = self.tx.take().expect("Sender is gone");
        // SAFETY: we don't use inner nor buffer after this line, it is not touched by Drop too
        // ignore error as we don't care if receiver no more interested in stream and buffer
        let _ = tx.send((
            // SAFETY: We required S to be Unpin, so here we can move it out of ManuallyDrop
            unsafe { ManuallyDrop::take(&mut self.inner) },
            // SAFETY: We don't need S::Item to be Unpin because we never pin them,
            // and buffer can be moved out of ManuallyDrop because we never pin it
            unsafe { ManuallyDrop::take(&mut self.buffer) },
        ));
        // we don't call ManuallyDrop on fields as they are moved to channel
    }
}

/// Returns stream that remembers all produced items
/// And resolves returned future with stream that behaves like original stream was never polled
/// In other words, it lets partially consume stream and get all consumed items back
pub fn remember<B: Push<S::Item> + IntoIterator<Item = S::Item> + Default, S: Stream + Unpin>(
    source: S,
) -> (
    impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
    impl Stream<Item = S::Item>,
)
where
    S::Item: Clone,
{
    let (tx, rx) = oneshot::channel();
    let fut = async {
        let (tail, buffer) = rx.await?;
        Ok(iter(buffer).chain(tail))
    };
    // fuse source stream to be able to poll it after finish (when we `chain` it with buffer)
    (fut, StreamBuffer::<B, _>::new(source.fuse(), tx))
}

/// Convenience function to use `Vec` as buffer
pub fn remember_vec<S: Stream + Unpin>(
    source: S,
) -> (
    impl Future<Output = Result<impl Stream<Item = S::Item>, Canceled>>,
    impl Stream<Item = S::Item>,
)
where
    S::Item: Clone,
{
    remember::<Vec<_>, _>(source)
}

#[cfg(test)]
mod tests {
    use super::{remember, remember_vec};
    use futures::{channel::oneshot::Canceled, executor::block_on, future::ready, StreamExt};

    fn check_lifetime_is_propagated_right() {
        fn assert_static<T: 'static>(_: &T) {}
        let x = vec![String::new(); 3];
        let source = futures::stream::iter(x.iter());
        // assert_static(&source);
        let (_buffer, _buffer_stream) = remember::<Vec<_>, _>(source);
        // assert_static(&buffer_stream);
    }

    #[test]
    fn test_consumed_values_are_present() {
        let x = vec![1, 2, 3];
        let source = futures::stream::iter(x.clone().into_iter());
        let (buffer, buffer_stream) = remember::<Vec<_>, _>(source);
        let res = block_on(async {
            // consume first two items
            buffer_stream.take(2).for_each(|_| ready(())).await;
            let stream = buffer.await?;
            // first two items are still present after stream comes back
            assert_eq!(stream.collect::<Vec<_>>().await, x);
            Ok::<_, Canceled>(())
        });
        assert!(res.is_ok());
    }

    #[test]
    fn test_consumed_stream_becomes_empty_tail_and_dont_panic() {
        struct UnfusedIter<I> {
            finished: bool,
            inner: I,
        }
        impl<I> UnfusedIter<I> {
            fn new(inner: I) -> UnfusedIter<I> {
                Self {
                    inner,
                    finished: false,
                }
            }
        }

        impl<I: Iterator> Iterator for UnfusedIter<I> {
            type Item = I::Item;

            fn next(&mut self) -> Option<Self::Item> {
                if self.finished {
                    panic!("Iterating over finished iterator");
                } else {
                    let next = self.inner.next();
                    if next.is_none() {
                        self.finished = true;
                    }
                    next
                }
            }
        }
        let x = vec![1, 2, 3];
        // Here we want to emulate stream that panics on poll after finish
        let source = futures::stream::iter(UnfusedIter::new(x.clone().into_iter()));
        let (buffer, buffer_stream) = remember_vec(source);
        let res = block_on(async {
            // consume whole stream
            buffer_stream.for_each(|_| ready(())).await;
            let stream = buffer.await?;
            // consumed stream doesn't panic on consume after finish
            assert_eq!(stream.collect::<Vec<_>>().await, x);
            Ok::<_, Canceled>(())
        });
        assert!(res.is_ok());
    }
}