amadeus_core/pipe/
flat_map.rs1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use serde_closure::traits::FnMut;
5use std::{
6 pin::Pin, task::{Context, Poll}
7};
8
9use super::Pipe;
10
11#[pin_project]
12#[derive(new)]
13pub struct FlatMap<P, F, R> {
14 #[pin]
15 pipe: P,
16 f: F,
17 #[pin]
18 #[new(default)]
19 next: Option<R>,
20}
21
22impl<P: Stream, F, R> Stream for FlatMap<P, F, R>
23where
24 F: FnMut<(P::Item,), Output = R>,
25 R: Stream,
26{
27 type Item = R::Item;
28
29 #[inline]
30 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
31 let mut self_ = self.project();
32 Poll::Ready(loop {
33 if let Some(s) = self_.next.as_mut().as_pin_mut() {
34 if let Some(item) = ready!(s.poll_next(cx)) {
35 break Some(item);
36 }
37 self_.next.set(None);
38 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
39 self_.next.set(Some(self_.f.call_mut((s,))));
40 } else {
41 break None;
42 }
43 })
44 }
45}
46
47impl<P: Pipe<Input>, F, R, Input> Pipe<Input> for FlatMap<P, F, R>
48where
49 F: FnMut<(P::Output,), Output = R>,
50 R: Stream,
51{
52 type Output = R::Item;
53
54 #[inline]
55 fn poll_next(
56 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
57 ) -> Poll<Option<Self::Output>> {
58 let mut self_ = self.project();
59 Poll::Ready(loop {
60 if let Some(s) = self_.next.as_mut().as_pin_mut() {
61 if let Some(item) = ready!(s.poll_next(cx)) {
62 break Some(item);
63 }
64 self_.next.set(None);
65 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
66 self_.next.set(Some(self_.f.call_mut((s,))));
67 } else {
68 break None;
69 }
70 })
71 }
72}