1#![doc = include_str!("../README.md")]
2
3use {
4 std::ops::DerefMut,
5 futures::future::Either,
6 crate::{
7 impls::*,
8 internal_prelude::*,
9 },
10};
11pub use crate::{
12 from_future::from_future,
13 pending::pending,
14 try_from_future::try_from_future,
15 try_unfold::try_unfold,
16 unfold::unfold,
17};
18
19mod from_future;
20mod impls;
21mod internal_prelude;
22mod pending;
23#[cfg(feature = "tokio-stream")] mod tokio_stream;
24mod try_from_future;
25mod try_unfold;
26mod unfold;
27
28pub trait InfiniteStream {
29 type Item;
30
31 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item>;
32}
33
34impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for &mut S {
35 type Item = S::Item;
36
37 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
38 S::poll_next(Pin::new(&mut **self), cx)
39 }
40}
41
42impl<P: DerefMut + Unpin> InfiniteStream for Pin<P>
43where P::Target: InfiniteStream {
44 type Item = <P::Target as InfiniteStream>::Item;
45
46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
47 self.get_mut().as_mut().poll_next(cx)
48 }
49}
50
51impl<S: InfiniteStream + Unpin + ?Sized> InfiniteStream for Box<S> {
52 type Item = S::Item;
53
54 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
55 Pin::new(&mut **self).poll_next(cx)
56 }
57}
58
59impl<A: InfiniteStream, B: InfiniteStream<Item = A::Item>> InfiniteStream for Either<A, B> {
60 type Item = A::Item;
61
62 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Item> {
63 match self.as_pin_mut() {
64 Either::Left(x) => x.poll_next(cx),
65 Either::Right(x) => x.poll_next(cx),
66 }
67 }
68}
69
70pub trait InfiniteStreamExt: InfiniteStream {
71 fn next(&mut self) -> Next<'_, Self>
72 where Self: Unpin {
73 assert_future::<Self::Item, _>(Next(self))
74 }
75
76 fn boxed<'a>(self) -> Pin<Box<dyn InfiniteStream<Item = Self::Item> + Send + 'a>>
77 where Self: Send + Sized + 'a {
78 assert_infinite_stream::<Self::Item, _>(Box::pin(self))
79 }
80
81 fn filter<Fut: Future<Output = bool>, F: FnMut(&Self::Item) -> Fut>(self, f: F) -> Filter<Self, Fut, F>
82 where Self: Sized {
83 assert_infinite_stream::<Self::Item, _>(Filter { stream: self, f, pending_fut: None, pending_item: None })
84 }
85
86 fn filter_map<T, Fut: Future<Output = Option<T>>, F: FnMut(Self::Item) -> Fut>(self, f: F) -> FilterMap<Self, Fut, F>
87 where Self: Sized {
88 assert_infinite_stream::<T, _>(FilterMap { stream: self, f, pending: None })
89 }
90
91 fn left_stream<B: InfiniteStream<Item = Self::Item>>(self) -> Either<Self, B>
92 where Self: Sized {
93 assert_infinite_stream::<Self::Item, _>(Either::Left(self))
94 }
95
96 fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
97 where Self: Sized {
98 assert_infinite_stream::<T, _>(Map { stream: self, f })
99 }
100
101 fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Item>
102 where Self: Unpin {
103 Pin::new(self).poll_next(cx)
104 }
105
106 fn right_stream<A: InfiniteStream<Item = Self::Item>>(self) -> Either<A, Self>
107 where Self: Sized {
108 assert_infinite_stream::<Self::Item, _>(Either::Right(self))
109 }
110
111 fn then<T, Fut: Future<Output = T>, F: FnMut(Self::Item) -> Fut>(self, f: F) -> Then<Self, Fut, F>
112 where Self: Sized {
113 assert_infinite_stream::<T, _>(Then { stream: self, fut: None, f })
114 }
115}
116
117impl<T: InfiniteStream + ?Sized> InfiniteStreamExt for T {}
118
119pub trait StreamExt: Stream {
120 fn chain_infinite<B: InfiniteStream<Item = Self::Item>>(self, second: B) -> Chain<Self, B>
121 where Self: Sized {
122 assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second })
123 }
124
125 fn chain_pending(self) -> Chain<Self, pending::Pending<Self::Item>>
127 where Self: Sized {
128 assert_infinite_stream::<Self::Item, _>(Chain { first: Some(self), second: pending() })
129 }
130
131 fn expect(self, msg: &str) -> Expect<'_, Self>
132 where Self: Sized {
133 assert_infinite_stream::<Self::Item, _>(Expect { stream: self, msg })
134 }
135}
136
137impl<T: Stream + ?Sized> StreamExt for T {}
138
139fn assert_future<T, Fut: Future<Output = T>>(future: Fut) -> Fut { future }
141
142fn assert_infinite_stream<T, S: InfiniteStream<Item = T>>(stream: S) -> S { stream }