stream_flatten_iters/
flatten_iters.rs

1use core::pin::Pin;
2use futures_core::ready;
3use futures_core::stream::{FusedStream, Stream};
4use futures_core::task::{Context, Poll};
5use pin_utils::unsafe_pinned;
6
7impl<T: ?Sized> StreamExt for T where T: Stream {}
8
9/// An extension trait for Streams that provides a variety of convenient combinator functions.
10pub trait StreamExt: Stream {
11    /// Flattens a stream of iterators into one continuous stream.
12    fn flatten_iters(self) -> FlattenIters<Self>
13    where
14        Self::Item: IntoIterator,
15        Self: Sized,
16    {
17        FlattenIters::new(self)
18    }
19}
20
21/// Stream for the [`flatten_iters`](StreamExt::flatten_iters) method.
22// #[derive(Debug)]
23#[must_use = "streams do nothing unless polled"]
24pub struct FlattenIters<St>
25where
26    St: Stream,
27    St::Item: IntoIterator,
28{
29    stream: St,
30    next: Option<<St::Item as IntoIterator>::IntoIter>,
31}
32
33impl<St> Unpin for FlattenIters<St>
34where
35    St: Stream + Unpin,
36    St::Item: IntoIterator,
37    <St::Item as IntoIterator>::IntoIter: Unpin,
38{
39}
40
41impl<St> FlattenIters<St>
42where
43    St: Stream,
44    St::Item: IntoIterator,
45{
46    unsafe_pinned!(stream: St);
47    unsafe_pinned!(next: Option<<St::Item as IntoIterator>::IntoIter>);
48}
49
50impl<St> FlattenIters<St>
51where
52    St: Stream,
53    St::Item: IntoIterator,
54{
55    pub(crate) fn new(stream: St) -> Self {
56        Self { stream, next: None }
57    }
58
59    /// Acquires a reference to the underlying stream that this combinator is
60    /// pulling from.
61    pub fn get_ref(&self) -> &St {
62        &self.stream
63    }
64
65    /// Acquires a mutable reference to the underlying stream that this
66    /// combinator is pulling from.
67    ///
68    /// Note that care must be taken to avoid tampering with the state of the
69    /// stream which may otherwise confuse this combinator.
70    pub fn get_mut(&mut self) -> &mut St {
71        &mut self.stream
72    }
73
74    /// Acquires a pinned mutable reference to the underlying stream that this
75    /// combinator is pulling from.
76    ///
77    /// Note that care must be taken to avoid tampering with the state of the
78    /// stream which may otherwise confuse this combinator.
79    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
80        self.stream()
81    }
82
83    /// Consumes this combinator, returning the underlying stream.
84    ///
85    /// Note that this may discard intermediate state of this combinator, so
86    /// care should be taken to avoid losing resources when this is called.
87    pub fn into_inner(self) -> St {
88        self.stream
89    }
90}
91
92impl<St> FusedStream for FlattenIters<St>
93where
94    St: FusedStream,
95    St::Item: IntoIterator,
96    <St::Item as IntoIterator>::IntoIter: Unpin,
97{
98    fn is_terminated(&self) -> bool {
99        self.next.is_none() && self.stream.is_terminated()
100    }
101}
102
103impl<St> Stream for FlattenIters<St>
104where
105    St: Stream,
106    St::Item: IntoIterator,
107    <St::Item as IntoIterator>::IntoIter: Unpin,
108{
109    type Item = <St::Item as IntoIterator>::Item;
110
111    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        loop {
113            if self.next.is_none() {
114                match ready!(self.as_mut().stream().poll_next(cx)) {
115                    Some(e) => self.as_mut().next().set(Some(e.into_iter())),
116                    None => return Poll::Ready(None),
117                }
118            }
119
120            if let Some(item) = Option::as_mut(&mut self.as_mut().next()).unwrap().next() {
121                return Poll::Ready(Some(item));
122            } else {
123                self.as_mut().next().set(None);
124            }
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::StreamExt as _;
132    use futures::stream::{iter, StreamExt};
133
134    #[tokio::test]
135    async fn test_basic() {
136        let mut stream =
137            iter(vec![vec![0_usize, 1, 2], vec![3, 4], vec![], vec![5, 6, 7]]).flatten_iters();
138
139        assert_eq!(stream.next().await, Some(0));
140        assert_eq!(stream.next().await, Some(1));
141        assert_eq!(stream.next().await, Some(2));
142        assert_eq!(stream.next().await, Some(3));
143        assert_eq!(stream.next().await, Some(4));
144        assert_eq!(stream.next().await, Some(5));
145        assert_eq!(stream.next().await, Some(6));
146        assert_eq!(stream.next().await, Some(7));
147        assert_eq!(stream.next().await, None);
148    }
149
150    #[tokio::test]
151    async fn test_empty() {
152        let mut stream = iter(Vec::<Vec<String>>::new()).flatten_iters();
153
154        assert_eq!(stream.next().await, None);
155    }
156}