stream_flatten_iters/
flatten_iters.rs1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_utils::unsafe_pinned;
6
7impl<T: ?Sized> StreamExt for T where T: Stream {}
8
9pub trait StreamExt: Stream {
11 fn flatten_iters(self) -> FlattenIters<Self>
13 where
14 Self::Item: IntoIterator,
15 Self: Sized,
16 {
17 FlattenIters::new(self)
18 }
19}
20
21#[must_use = "streams do nothing unless polled"]
24pub struct FlattenIters<St>
25where
26 St: Stream,
27 St::Item: IntoIterator,
28{
29 stream: St,
30 next: Option<<St::Item as IntoIterator>::IntoIter>,
31}
32
33impl<St> Unpin for FlattenIters<St>
34where
35 St: Stream + Unpin,
36 St::Item: IntoIterator,
37 <St::Item as IntoIterator>::IntoIter: Unpin,
38{
39}
40
41impl<St> FlattenIters<St>
42where
43 St: Stream,
44 St::Item: IntoIterator,
45{
46 unsafe_pinned!(stream: St);
47 unsafe_pinned!(next: Option<<St::Item as IntoIterator>::IntoIter>);
48}
49
50impl<St> FlattenIters<St>
51where
52 St: Stream,
53 St::Item: IntoIterator,
54{
55 pub(crate) fn new(stream: St) -> Self {
56 Self { stream, next: None }
57 }
58
59 pub fn get_ref(&self) -> &St {
62 &self.stream
63 }
64
65 pub fn get_mut(&mut self) -> &mut St {
71 &mut self.stream
72 }
73
74 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
80 self.stream()
81 }
82
83 pub fn into_inner(self) -> St {
88 self.stream
89 }
90}
91
92impl<St> FusedStream for FlattenIters<St>
93where
94 St: FusedStream,
95 St::Item: IntoIterator,
96 <St::Item as IntoIterator>::IntoIter: Unpin,
97{
98 fn is_terminated(&self) -> bool {
99 self.next.is_none() && self.stream.is_terminated()
100 }
101}
102
103impl<St> Stream for FlattenIters<St>
104where
105 St: Stream,
106 St::Item: IntoIterator,
107 <St::Item as IntoIterator>::IntoIter: Unpin,
108{
109 type Item = <St::Item as IntoIterator>::Item;
110
111 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 loop {
113 if self.next.is_none() {
114 match ready!(self.as_mut().stream().poll_next(cx)) {
115 Some(e) => self.as_mut().next().set(Some(e.into_iter())),
116 None => return Poll::Ready(None),
117 }
118 }
119
120 if let Some(item) = Option::as_mut(&mut self.as_mut().next()).unwrap().next() {
121 return Poll::Ready(Some(item));
122 } else {
123 self.as_mut().next().set(None);
124 }
125 }
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::StreamExt as _;
132 use futures::stream::{iter, StreamExt};
133
134 #[tokio::test]
135 async fn test_basic() {
136 let mut stream =
137 iter(vec![vec![0_usize, 1, 2], vec![3, 4], vec![], vec![5, 6, 7]]).flatten_iters();
138
139 assert_eq!(stream.next().await, Some(0));
140 assert_eq!(stream.next().await, Some(1));
141 assert_eq!(stream.next().await, Some(2));
142 assert_eq!(stream.next().await, Some(3));
143 assert_eq!(stream.next().await, Some(4));
144 assert_eq!(stream.next().await, Some(5));
145 assert_eq!(stream.next().await, Some(6));
146 assert_eq!(stream.next().await, Some(7));
147 assert_eq!(stream.next().await, None);
148 }
149
150 #[tokio::test]
151 async fn test_empty() {
152 let mut stream = iter(Vec::<Vec<String>>::new()).flatten_iters();
153
154 assert_eq!(stream.next().await, None);
155 }
156}