completion/stream/adapters/
flatten.rs

1//! `FlatMap` and `Flatten`.
2
3use core::pin::Pin;
4use core::task::{Context, Poll};
5
6use completion_core::CompletionStream;
7use futures_core::{ready, Stream};
8use pin_project_lite::pin_project;
9
10use crate::CompletionStreamExt;
11
12use super::Map;
13
14pin_project! {
15    /// Stream for [`CompletionStreamExt::flat_map`].
16    #[derive(Debug, Clone)]
17    pub struct FlatMap<S: CompletionStream, U, F>
18    where
19        F: FnMut(S::Item) -> U,
20    {
21        #[pin]
22        inner: Flatten<Map<S, F>>,
23    }
24}
25
26impl<S: CompletionStream, U: CompletionStream, F: FnMut(S::Item) -> U> FlatMap<S, U, F> {
27    pub(crate) fn new(stream: S, f: F) -> Self {
28        Self {
29            inner: stream.map(f).flatten(),
30        }
31    }
32}
33
34impl<S, U, F> CompletionStream for FlatMap<S, U, F>
35where
36    S: CompletionStream,
37    U: CompletionStream,
38    F: FnMut(S::Item) -> U,
39{
40    type Item = U::Item;
41
42    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        self.project().inner.poll_next(cx)
44    }
45    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
46        self.project().inner.poll_cancel(cx)
47    }
48    fn size_hint(&self) -> (usize, Option<usize>) {
49        self.inner.size_hint()
50    }
51}
52
53impl<S, U, F> Stream for FlatMap<S, U, F>
54where
55    S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
56    U: CompletionStream + Stream<Item = <U as CompletionStream>::Item>,
57    F: FnMut(<S as CompletionStream>::Item) -> U,
58{
59    type Item = <Self as CompletionStream>::Item;
60    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61        #[inline]
62        fn assert_is_stream<T: Stream>() {}
63        assert_is_stream::<Flatten<Map<S, F>>>();
64
65        unsafe { CompletionStream::poll_next(self, cx) }
66    }
67    fn size_hint(&self) -> (usize, Option<usize>) {
68        CompletionStream::size_hint(self)
69    }
70}
71
72pin_project! {
73    /// Stream for [`CompletionStreamExt::flatten`].
74    #[derive(Debug, Clone)]
75    pub struct Flatten<S: CompletionStream> {
76        #[pin]
77        stream: S,
78        #[pin]
79        current: Option<S::Item>,
80    }
81}
82
83impl<S: CompletionStream> Flatten<S> {
84    pub(crate) fn new(stream: S) -> Self {
85        Self {
86            stream,
87            current: None,
88        }
89    }
90}
91
92impl<S, U> CompletionStream for Flatten<S>
93where
94    S: CompletionStream<Item = U>,
95    U: CompletionStream,
96{
97    type Item = U::Item;
98
99    unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100        let mut this = self.project();
101
102        Poll::Ready(loop {
103            if let Some(current) = this.current.as_mut().as_pin_mut() {
104                if let Some(item) = ready!(current.poll_next(cx)) {
105                    break Some(item);
106                }
107                this.current.set(None);
108            }
109
110            this.current
111                .set(Some(match ready!(this.stream.as_mut().poll_next(cx)) {
112                    Some(stream) => stream,
113                    None => break None,
114                }));
115        })
116    }
117
118    unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
119        let mut this = self.project();
120
121        if let Some(current) = this.current.as_mut().as_pin_mut() {
122            current.poll_cancel(cx)
123        } else {
124            this.stream.poll_cancel(cx)
125        }
126    }
127
128    fn size_hint(&self) -> (usize, Option<usize>) {
129        let (_, stream_upper) = self.stream.size_hint();
130        let (current_lower, current_upper) =
131            self.current.as_ref().map_or((0, Some(0)), U::size_hint);
132        (
133            current_lower,
134            if stream_upper == Some(0) {
135                current_upper
136            } else {
137                None
138            },
139        )
140    }
141}
142
143impl<S, U> Stream for Flatten<S>
144where
145    S: CompletionStream<Item = U> + Stream<Item = U>,
146    U: CompletionStream + Stream<Item = <U as CompletionStream>::Item>,
147{
148    type Item = <Self as CompletionStream>::Item;
149
150    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151        unsafe { CompletionStream::poll_next(self, cx) }
152    }
153    fn size_hint(&self) -> (usize, Option<usize>) {
154        CompletionStream::size_hint(self)
155    }
156}