async_std/stream/stream/
cycle.rs

1use std::mem::ManuallyDrop;
2use std::pin::Pin;
3
4use crate::stream::Stream;
5use crate::task::{Context, Poll};
6
7/// A stream that will repeatedly yield the same list of elements.
8#[derive(Debug)]
9pub struct Cycle<S> {
10    orig: S,
11    source: ManuallyDrop<S>,
12}
13
14impl<S> Cycle<S>
15where
16    S: Stream + Clone,
17{
18    pub(crate) fn new(source: S) -> Self {
19        Self {
20            orig: source.clone(),
21            source: ManuallyDrop::new(source),
22        }
23    }
24}
25
26impl<S> Drop for Cycle<S> {
27    fn drop(&mut self) {
28        unsafe {
29            ManuallyDrop::drop(&mut self.source);
30        }
31    }
32}
33
34impl<S> Stream for Cycle<S>
35where
36    S: Stream + Clone,
37{
38    type Item = S::Item;
39
40    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41        unsafe {
42            let this = self.get_unchecked_mut();
43
44            match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) {
45                Some(item) => Poll::Ready(Some(item)),
46                None => {
47                    ManuallyDrop::drop(&mut this.source);
48                    this.source = ManuallyDrop::new(this.orig.clone());
49                    Pin::new_unchecked(&mut *this.source).poll_next(cx)
50                }
51            }
52        }
53    }
54}