amadeus_core/pipe/
flat_map_sync.rs1use derive_new::new;
2use futures::{ready, Stream};
3use pin_project::pin_project;
4use serde_closure::traits::FnMut;
5use std::{
6 pin::Pin, task::{Context, Poll}
7};
8
9use super::Pipe;
10
11#[pin_project]
12#[derive(new)]
13pub struct FlatMapSync<P, F, R> {
14 #[pin]
15 pipe: P,
16 f: F,
17 #[new(default)]
18 next: Option<R>,
19}
20
21impl<P: Stream, F, R> Stream for FlatMapSync<P, F, R>
22where
23 F: FnMut<(P::Item,), Output = R>,
24 R: Iterator,
25{
26 type Item = R::Item;
27
28 #[inline]
29 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
30 let mut self_ = self.project();
31 Poll::Ready(loop {
32 if let Some(s) = self_.next.as_mut() {
33 if let Some(item) = s.next() {
34 break Some(item);
35 }
36 *self_.next = None;
37 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx)) {
38 *self_.next = Some(self_.f.call_mut((s,)));
39 } else {
40 break None;
41 }
42 })
43 }
44}
45
46impl<P: Pipe<Input>, F, R, Input> Pipe<Input> for FlatMapSync<P, F, R>
47where
48 F: FnMut<(P::Output,), Output = R>,
49 R: Iterator,
50{
51 type Output = R::Item;
52
53 #[inline]
54 fn poll_next(
55 self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Input>>,
56 ) -> Poll<Option<Self::Output>> {
57 let mut self_ = self.project();
58 Poll::Ready(loop {
59 if let Some(s) = self_.next.as_mut() {
60 if let Some(item) = s.next() {
61 break Some(item);
62 }
63 *self_.next = None;
64 } else if let Some(s) = ready!(self_.pipe.as_mut().poll_next(cx, stream.as_mut())) {
65 *self_.next = Some(self_.f.call_mut((s,)));
66 } else {
67 break None;
68 }
69 })
70 }
71}