amadeus_core/pipe/
flat_map_sync.rs

1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use serde_closure::traits::FnMut;
5use std::{
6	pin::Pin, task::{Context, Poll}
7};
8
9use super::Pipe;
10
11#[pin_project]
12#[derive(new)]
13pub struct FlatMapSync<P, F, R> {
14	#[pin]
15	pipe: P,
16	f: F,
17	#[new(default)]
18	next: Option<R>,
19}
20
21impl<P: Stream, F, R> Stream for FlatMapSync<P, F, R>
22where
23	F: FnMut<(P::Item,), Output = R>,
24	R: Iterator,
25{
26	type Item = R::Item;
27
28	#[inline]
29	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
30		let mut self_ = self.project();
31		Poll::Ready(loop {
32			if let Some(s) = self_.next.as_mut() {
33				if let Some(item) = s.next() {
34					break Some(item);
35				}
36				*self_.next = None;
37			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
38				*self_.next = Some(self_.f.call_mut((s,)));
39			} else {
40				break None;
41			}
42		})
43	}
44}
45
46impl<P: Pipe<Input>, F, R, Input> Pipe<Input> for FlatMapSync<P, F, R>
47where
48	F: FnMut<(P::Output,), Output = R>,
49	R: Iterator,
50{
51	type Output = R::Item;
52
53	#[inline]
54	fn poll_next(
55		self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
56	) -> Poll<Option<Self::Output>> {
57		let mut self_ = self.project();
58		Poll::Ready(loop {
59			if let Some(s) = self_.next.as_mut() {
60				if let Some(item) = s.next() {
61					break Some(item);
62				}
63				*self_.next = None;
64			} else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
65				*self_.next = Some(self_.f.call_mut((s,)));
66			} else {
67				break None;
68			}
69		})
70	}
71}