async_std/stream/stream/
cycle.rs1use std::mem::ManuallyDrop;
2use std::pin::Pin;
3
4use crate::stream::Stream;
5use crate::task::{Context, Poll};
6
7#[derive(Debug)]
9pub struct Cycle<S> {
10 orig: S,
11 source: ManuallyDrop<S>,
12}
13
14impl<S> Cycle<S>
15where
16 S: Stream + Clone,
17{
18 pub(crate) fn new(source: S) -> Self {
19 Self {
20 orig: source.clone(),
21 source: ManuallyDrop::new(source),
22 }
23 }
24}
25
26impl<S> Drop for Cycle<S> {
27 fn drop(&mut self) {
28 unsafe {
29 ManuallyDrop::drop(&mut self.source);
30 }
31 }
32}
33
34impl<S> Stream for Cycle<S>
35where
36 S: Stream + Clone,
37{
38 type Item = S::Item;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 unsafe {
42 let this = self.get_unchecked_mut();
43
44 match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) {
45 Some(item) => Poll::Ready(Some(item)),
46 None => {
47 ManuallyDrop::drop(&mut this.source);
48 this.source = ManuallyDrop::new(this.orig.clone());
49 Pin::new_unchecked(&mut *this.source).poll_next(cx)
50 }
51 }
52 }
53 }
54}