completion/stream/adapters/
flatten.rs1use core::pin::Pin;
4use core::task::{Context, Poll};
5
6use completion_core::CompletionStream;
7use futures_core::{ready, Stream};
8use pin_project_lite::pin_project;
9
10use crate::CompletionStreamExt;
11
12use super::Map;
13
14pin_project! {
15 #[derive(Debug, Clone)]
17 pub struct FlatMap<S: CompletionStream, U, F>
18 where
19 F: FnMut(S::Item) -> U,
20 {
21 #[pin]
22 inner: Flatten<Map<S, F>>,
23 }
24}
25
26impl<S: CompletionStream, U: CompletionStream, F: FnMut(S::Item) -> U> FlatMap<S, U, F> {
27 pub(crate) fn new(stream: S, f: F) -> Self {
28 Self {
29 inner: stream.map(f).flatten(),
30 }
31 }
32}
33
34impl<S, U, F> CompletionStream for FlatMap<S, U, F>
35where
36 S: CompletionStream,
37 U: CompletionStream,
38 F: FnMut(S::Item) -> U,
39{
40 type Item = U::Item;
41
42 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43 self.project().inner.poll_next(cx)
44 }
45 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
46 self.project().inner.poll_cancel(cx)
47 }
48 fn size_hint(&self) -> (usize, Option<usize>) {
49 self.inner.size_hint()
50 }
51}
52
53impl<S, U, F> Stream for FlatMap<S, U, F>
54where
55 S: CompletionStream + Stream<Item = <S as CompletionStream>::Item>,
56 U: CompletionStream + Stream<Item = <U as CompletionStream>::Item>,
57 F: FnMut(<S as CompletionStream>::Item) -> U,
58{
59 type Item = <Self as CompletionStream>::Item;
60 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
61 #[inline]
62 fn assert_is_stream<T: Stream>() {}
63 assert_is_stream::<Flatten<Map<S, F>>>();
64
65 unsafe { CompletionStream::poll_next(self, cx) }
66 }
67 fn size_hint(&self) -> (usize, Option<usize>) {
68 CompletionStream::size_hint(self)
69 }
70}
71
72pin_project! {
73 #[derive(Debug, Clone)]
75 pub struct Flatten<S: CompletionStream> {
76 #[pin]
77 stream: S,
78 #[pin]
79 current: Option<S::Item>,
80 }
81}
82
83impl<S: CompletionStream> Flatten<S> {
84 pub(crate) fn new(stream: S) -> Self {
85 Self {
86 stream,
87 current: None,
88 }
89 }
90}
91
92impl<S, U> CompletionStream for Flatten<S>
93where
94 S: CompletionStream<Item = U>,
95 U: CompletionStream,
96{
97 type Item = U::Item;
98
99 unsafe fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100 let mut this = self.project();
101
102 Poll::Ready(loop {
103 if let Some(current) = this.current.as_mut().as_pin_mut() {
104 if let Some(item) = ready!(current.poll_next(cx)) {
105 break Some(item);
106 }
107 this.current.set(None);
108 }
109
110 this.current
111 .set(Some(match ready!(this.stream.as_mut().poll_next(cx)) {
112 Some(stream) => stream,
113 None => break None,
114 }));
115 })
116 }
117
118 unsafe fn poll_cancel(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
119 let mut this = self.project();
120
121 if let Some(current) = this.current.as_mut().as_pin_mut() {
122 current.poll_cancel(cx)
123 } else {
124 this.stream.poll_cancel(cx)
125 }
126 }
127
128 fn size_hint(&self) -> (usize, Option<usize>) {
129 let (_, stream_upper) = self.stream.size_hint();
130 let (current_lower, current_upper) =
131 self.current.as_ref().map_or((0, Some(0)), U::size_hint);
132 (
133 current_lower,
134 if stream_upper == Some(0) {
135 current_upper
136 } else {
137 None
138 },
139 )
140 }
141}
142
143impl<S, U> Stream for Flatten<S>
144where
145 S: CompletionStream<Item = U> + Stream<Item = U>,
146 U: CompletionStream + Stream<Item = <U as CompletionStream>::Item>,
147{
148 type Item = <Self as CompletionStream>::Item;
149
150 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
151 unsafe { CompletionStream::poll_next(self, cx) }
152 }
153 fn size_hint(&self) -> (usize, Option<usize>) {
154 CompletionStream::size_hint(self)
155 }
156}