infinite_stream/
lib.rs

1use {
2    std::ops::DerefMut,
3    futures::future::Either,
4    crate::{
5        impls::*,
6        internal_prelude::*,
7    },
8};
9pub use crate::{
10    from_future::from_future,
11    pending::pending,
12    try_from_future::try_from_future,
13    try_unfold::try_unfold,
14    unfold::unfold,
15};
16
17mod from_future;
18mod impls;
19mod internal_prelude;
20mod pending;
21#[cfg(feature = "tokio-stream")] mod tokio_stream;
22mod try_from_future;
23mod try_unfold;
24mod unfold;
25
26pub trait InfiniteStream {
27    type Item;
28
29    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item>;
30}
31
32impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for &mut S {
33    type Item = S::Item;
34
35    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
36        S::poll_next(Pin::new(&mut **self), cx)
37    }
38}
39
40impl<P: DerefMut + Unpin> InfiniteStream for Pin<P>
41where P::Target: InfiniteStream {
42    type Item = <P::Target as InfiniteStream>::Item;
43
44    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
45        self.get_mut().as_mut().poll_next(cx)
46    }
47}
48
49impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for Box<S> {
50    type Item = S::Item;
51
52    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
53        Pin::new(&mut **self).poll_next(cx)
54    }
55}
56
57impl<A: InfiniteStream, B: InfiniteStream<Item = A::Item>> InfiniteStream for Either<A, B> {
58    type Item = A::Item;
59
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
61        match self.as_pin_mut() {
62            Either::Left(x) => x.poll_next(cx),
63            Either::Right(x) => x.poll_next(cx),
64        }
65    }
66}
67
68pub trait InfiniteStreamExt: InfiniteStream {
69    fn next(&mut self) -> Next<'_, Self>
70    where Self: Unpin {
71        assert_future::<Self::Item, _>(Next(self))
72    }
73
74    fn boxed<'a>(self) -> Pin<Box<dyn InfiniteStream<Item = Self::Item> + Send + 'a>>
75    where Self: Send + Sized + 'a {
76        assert_infinite_stream::<Self::Item, _>(Box::pin(self))
77    }
78
79    fn filter<Fut: Future<Output = bool>, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Filter<Self, Fut, F>
80    where Self: Sized {
81        assert_infinite_stream::<Self::Item, _>(Filter { stream: self, f, pending_fut: None, pending_item: None })
82    }
83
84    fn left_stream<B: InfiniteStream<Item = Self::Item>>(self) -> Either<Self, B>
85    where Self: Sized {
86        assert_infinite_stream::<Self::Item, _>(Either::Left(self))
87    }
88
89    fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
90    where Self: Sized {
91        assert_infinite_stream::<T, _>(Map { stream: self, f })
92    }
93
94    fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Item>
95    where Self: Unpin {
96        Pin::new(self).poll_next(cx)
97    }
98
99    fn right_stream<A: InfiniteStream<Item = Self::Item>>(self) -> Either<A, Self>
100    where Self: Sized {
101        assert_infinite_stream::<Self::Item, _>(Either::Right(self))
102    }
103}
104
105impl<T: InfiniteStream + ?Sized> InfiniteStreamExt for T {}
106
107pub trait StreamExt: Stream {
108    fn chain_infinite<B: InfiniteStream<Item = Self::Item>>(self, second: B) -> Chain<Self, B>
109    where Self: Sized {
110        assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second })
111    }
112
113    /// Shorthand for `.chain_infinite(infinite_stream::pending())`.
114    fn chain_pending(self) -> Chain<Self, pending::Pending<Self::Item>>
115    where Self: Sized {
116        assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second: pending() })
117    }
118
119    fn expect(self, msg: &str) -> Expect<'_, Self>
120    where Self: Sized {
121        assert_infinite_stream::<Self::Item, _>(Expect { stream: self, msg })
122    }
123}
124
125impl<T: Stream + ?Sized> StreamExt for T {}
126
127/// Just a helper function to ensure the futures we're returning all have the right implementations.
128fn assert_future<T, Fut: Future<Output = T>>(future: Fut) -> Fut { future }
129
130/// Just a helper function to ensure the infinite streams we're returning all have the right implementations.
131fn assert_infinite_stream<T, S: InfiniteStream<Item = T>>(stream: S) -> S { stream }