Skip to main content

infinite_stream/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use {
4    std::ops::DerefMut,
5    futures::future::Either,
6    crate::{
7        impls::*,
8        internal_prelude::*,
9    },
10};
11pub use crate::{
12    from_future::from_future,
13    pending::pending,
14    try_from_future::try_from_future,
15    try_unfold::try_unfold,
16    unfold::unfold,
17};
18
19mod from_future;
20mod impls;
21mod internal_prelude;
22mod pending;
23#[cfg(feature = "tokio-stream")] mod tokio_stream;
24mod try_from_future;
25mod try_unfold;
26mod unfold;
27
28pub trait InfiniteStream {
29    type Item;
30
31    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item>;
32}
33
34impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for &mut S {
35    type Item = S::Item;
36
37    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
38        S::poll_next(Pin::new(&mut **self), cx)
39    }
40}
41
42impl<P: DerefMut + Unpin> InfiniteStream for Pin<P>
43where P::Target: InfiniteStream {
44    type Item = <P::Target as InfiniteStream>::Item;
45
46    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
47        self.get_mut().as_mut().poll_next(cx)
48    }
49}
50
51impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for Box<S> {
52    type Item = S::Item;
53
54    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
55        Pin::new(&mut **self).poll_next(cx)
56    }
57}
58
59impl<A: InfiniteStream, B: InfiniteStream<Item = A::Item>> InfiniteStream for Either<A, B> {
60    type Item = A::Item;
61
62    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
63        match self.as_pin_mut() {
64            Either::Left(x) => x.poll_next(cx),
65            Either::Right(x) => x.poll_next(cx),
66        }
67    }
68}
69
70pub trait InfiniteStreamExt: InfiniteStream {
71    fn next(&mut self) -> Next<'_, Self>
72    where Self: Unpin {
73        assert_future::<Self::Item, _>(Next(self))
74    }
75
76    fn boxed<'a>(self) -> Pin<Box<dyn InfiniteStream<Item = Self::Item> + Send + 'a>>
77    where Self: Send + Sized + 'a {
78        assert_infinite_stream::<Self::Item, _>(Box::pin(self))
79    }
80
81    fn filter<Fut: Future<Output = bool>, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Filter<Self, Fut, F>
82    where Self: Sized {
83        assert_infinite_stream::<Self::Item, _>(Filter { stream: self, f, pending_fut: None, pending_item: None })
84    }
85
86    fn left_stream<B: InfiniteStream<Item = Self::Item>>(self) -> Either<Self, B>
87    where Self: Sized {
88        assert_infinite_stream::<Self::Item, _>(Either::Left(self))
89    }
90
91    fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
92    where Self: Sized {
93        assert_infinite_stream::<T, _>(Map { stream: self, f })
94    }
95
96    fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Item>
97    where Self: Unpin {
98        Pin::new(self).poll_next(cx)
99    }
100
101    fn right_stream<A: InfiniteStream<Item = Self::Item>>(self) -> Either<A, Self>
102    where Self: Sized {
103        assert_infinite_stream::<Self::Item, _>(Either::Right(self))
104    }
105
106    fn then<T, Fut: Future<Output = T>, F: FnMut(Self::Item) -> Fut>(self, f: F) -> Then<Self, Fut, F>
107    where Self: Sized {
108        assert_infinite_stream::<T, _>(Then { stream: self, fut: None, f })
109    }
110}
111
112impl<T: InfiniteStream + ?Sized> InfiniteStreamExt for T {}
113
114pub trait StreamExt: Stream {
115    fn chain_infinite<B: InfiniteStream<Item = Self::Item>>(self, second: B) -> Chain<Self, B>
116    where Self: Sized {
117        assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second })
118    }
119
120    /// Shorthand for `.chain_infinite(infinite_stream::pending())`.
121    fn chain_pending(self) -> Chain<Self, pending::Pending<Self::Item>>
122    where Self: Sized {
123        assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second: pending() })
124    }
125
126    fn expect(self, msg: &str) -> Expect<'_, Self>
127    where Self: Sized {
128        assert_infinite_stream::<Self::Item, _>(Expect { stream: self, msg })
129    }
130}
131
132impl<T: Stream + ?Sized> StreamExt for T {}
133
134/// Just a helper function to ensure the futures we're returning all have the right implementations.
135fn assert_future<T, Fut: Future<Output = T>>(future: Fut) -> Fut { future }
136
137/// Just a helper function to ensure the infinite streams we're returning all have the right implementations.
138fn assert_infinite_stream<T, S: InfiniteStream<Item = T>>(stream: S) -> S { stream }