amadeus_core/pipe/
flatten.rs

1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use std::{
5	pin::Pin, task::{Context, Poll}
6};
7
8use super::Pipe;
9
10#[pin_project]
11#[derive(new)]
12pub struct Flatten<P, U> {
13	#[pin]
14	pipe: P,
15	#[pin]
16	#[new(default)]
17	next: Option<U>,
18}
19
20impl<P: Stream> Stream for Flatten<P, P::Item>
21where
22	P::Item: Stream,
23{
24	type Item = <P::Item as Stream>::Item;
25
26	#[inline]
27	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
28		let mut self_ = self.project();
29		Poll::Ready(loop {
30			if let Some(s) = self_.next.as_mut().as_pin_mut() {
31				if let Some(item) = ready!(s.poll_next(cx)) {
32					break Some(item);
33				}
34				self_.next.set(None);
35			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
36				self_.next.set(Some(s));
37			} else {
38				break None;
39			}
40		})
41	}
42}
43
44impl<P: Pipe<Input>, Input> Pipe<Input> for Flatten<P, P::Output>
45where
46	P::Output: Stream,
47{
48	type Output = <P::Output as Stream>::Item;
49
50	#[inline]
51	fn poll_next(
52		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
53	) -> Poll<Option<Self::Output>> {
54		let mut self_ = self.project();
55		Poll::Ready(loop {
56			if let Some(s) = self_.next.as_mut().as_pin_mut() {
57				if let Some(item) = ready!(s.poll_next(cx)) {
58					break Some(item);
59				}
60				self_.next.set(None);
61			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
62				self_.next.set(Some(s));
63			} else {
64				break None;
65			}
66		})
67	}
68}