futures_util/stream/
flatten.rs1use futures_core::{Poll, Async, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5#[derive(Debug)]
10#[must_use = "streams do nothing unless polled"]
11pub struct Flatten<S>
12 where S: Stream,
13{
14 stream: S,
15 next: Option<S::Item>,
16}
17
18pub fn new<S>(s: S) -> Flatten<S>
19 where S: Stream,
20 S::Item: Stream<Error = S::Error>,
21{
22 Flatten {
23 stream: s,
24 next: None,
25 }
26}
27
28impl<S: Stream> Flatten<S> {
29 pub fn get_ref(&self) -> &S {
32 &self.stream
33 }
34
35 pub fn get_mut(&mut self) -> &mut S {
41 &mut self.stream
42 }
43
44 pub fn into_inner(self) -> S {
49 self.stream
50 }
51}
52
53impl<S> Sink for Flatten<S>
55 where S: Sink + Stream
56{
57 type SinkItem = S::SinkItem;
58 type SinkError = S::SinkError;
59
60 delegate_sink!(stream);
61}
62
63impl<S> Stream for Flatten<S>
64 where S: Stream,
65 S::Item: Stream,
66 <S::Item as Stream>::Error: From<S::Error>,
67{
68 type Item = <S::Item as Stream>::Item;
69 type Error = <S::Item as Stream>::Error;
70
71 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
72 loop {
73 if self.next.is_none() {
74 match try_ready!(self.stream.poll_next(cx)) {
75 Some(e) => self.next = Some(e),
76 None => return Ok(Async::Ready(None)),
77 }
78 }
79 assert!(self.next.is_some());
80 match self.next.as_mut().unwrap().poll_next(cx) {
81 Ok(Async::Ready(None)) => self.next = None,
82 other => return other,
83 }
84 }
85 }
86}