amadeus_core/pipe/
flat_map.rs

1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use serde_closure::traits::FnMut;
5use std::{
6	pin::Pin, task::{Context, Poll}
7};
8
9use super::Pipe;
10
11#[pin_project]
12#[derive(new)]
13pub struct FlatMap<P, F, R> {
14	#[pin]
15	pipe: P,
16	f: F,
17	#[pin]
18	#[new(default)]
19	next: Option<R>,
20}
21
22impl<P: Stream, F, R> Stream for FlatMap<P, F, R>
23where
24	F: FnMut<(P::Item,), Output = R>,
25	R: Stream,
26{
27	type Item = R::Item;
28
29	#[inline]
30	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
31		let mut self_ = self.project();
32		Poll::Ready(loop {
33			if let Some(s) = self_.next.as_mut().as_pin_mut() {
34				if let Some(item) = ready!(s.poll_next(cx)) {
35					break Some(item);
36				}
37				self_.next.set(None);
38			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
39				self_.next.set(Some(self_.f.call_mut((s,))));
40			} else {
41				break None;
42			}
43		})
44	}
45}
46
47impl<P: Pipe<Input>, F, R, Input> Pipe<Input> for FlatMap<P, F, R>
48where
49	F: FnMut<(P::Output,), Output = R>,
50	R: Stream,
51{
52	type Output = R::Item;
53
54	#[inline]
55	fn poll_next(
56		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
57	) -> Poll<Option<Self::Output>> {
58		let mut self_ = self.project();
59		Poll::Ready(loop {
60			if let Some(s) = self_.next.as_mut().as_pin_mut() {
61				if let Some(item) = ready!(s.poll_next(cx)) {
62					break Some(item);
63				}
64				self_.next.set(None);
65			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
66				self_.next.set(Some(self_.f.call_mut((s,))));
67			} else {
68				break None;
69			}
70		})
71	}
72}