amadeus_core/pipe/
flatten.rs1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use std::{
5 pin::Pin, task::{Context, Poll}
6};
7
8use super::Pipe;
9
10#[pin_project]
11#[derive(new)]
12pub struct Flatten<P, U> {
13 #[pin]
14 pipe: P,
15 #[pin]
16 #[new(default)]
17 next: Option<U>,
18}
19
20impl<P: Stream> Stream for Flatten<P, P::Item>
21where
22 P::Item: Stream,
23{
24 type Item = <P::Item as Stream>::Item;
25
26 #[inline]
27 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
28 let mut self_ = self.project();
29 Poll::Ready(loop {
30 if let Some(s) = self_.next.as_mut().as_pin_mut() {
31 if let Some(item) = ready!(s.poll_next(cx)) {
32 break Some(item);
33 }
34 self_.next.set(None);
35 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
36 self_.next.set(Some(s));
37 } else {
38 break None;
39 }
40 })
41 }
42}
43
44impl<P: Pipe<Input>, Input> Pipe<Input> for Flatten<P, P::Output>
45where
46 P::Output: Stream,
47{
48 type Output = <P::Output as Stream>::Item;
49
50 #[inline]
51 fn poll_next(
52 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
53 ) -> Poll<Option<Self::Output>> {
54 let mut self_ = self.project();
55 Poll::Ready(loop {
56 if let Some(s) = self_.next.as_mut().as_pin_mut() {
57 if let Some(item) = ready!(s.poll_next(cx)) {
58 break Some(item);
59 }
60 self_.next.set(None);
61 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
62 self_.next.set(Some(s));
63 } else {
64 break None;
65 }
66 })
67 }
68}