1mod filter;
2mod filter_map_sync;
3mod flat_map;
4mod flat_map_sync;
5mod flatten;
6mod map;
7
8use derive_new::new;
9use futures::{pin_mut, stream, Future, Stream};
10use pin_project::pin_project;
11use std::{
12 marker::PhantomData, mem, ops::DerefMut, pin::Pin, task::{Context, Poll}
13};
14
15pub use self::{filter::*, filter_map_sync::*, flat_map::*, flat_map_sync::*, flatten::*, map::*};
16
17pub trait StreamExt: Stream {
22 #[inline(always)]
23 fn pipe<P>(self, pipe: P) -> StreamPipe<Self, P>
24 where
25 P: Pipe<Self::Item>,
26 Self: Sized,
27 {
28 assert_stream(StreamPipe { stream: self, pipe })
29 }
30
31 #[inline(always)]
32 fn sink<S>(self, sink: S) -> StreamSink<Self, S>
33 where
34 S: Sink<Self::Item>,
35 Self: Sized,
36 {
37 assert_future(StreamSink { stream: self, sink })
38 }
39}
40impl<S: ?Sized> StreamExt for S where S: Stream {}
41
42pub trait Pipe<Input> {
43 type Output;
44
45 fn poll_next(
46 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
47 ) -> Poll<Option<Self::Output>>;
48
49 #[inline(always)]
50 fn pipe<P>(self, pipe: P) -> PipePipe<Self, P>
51 where
52 P: Pipe<Self::Output>,
53 Self: Sized,
54 {
55 assert_pipe(PipePipe { a: self, b: pipe })
56 }
57
58 #[inline(always)]
59 fn sink<S>(self, sink: S) -> PipeSink<Self, S>
60 where
61 S: Sink<Self::Output>,
62 Self: Sized,
63 {
64 assert_sink(PipeSink { pipe: self, sink })
65 }
66
67 #[inline(always)]
68 fn filter<F>(self, f: F) -> Filter<Self, F>
69 where
70 F: FnMut(&Self::Output) -> bool,
71 Self: Sized,
72 {
73 assert_pipe(Filter::new(self, f))
74 }
75
76 #[inline(always)]
77 fn flat_map<F, R>(self, f: F) -> FlatMap<Self, F, R>
78 where
79 F: FnMut(Self::Output) -> R,
80 R: Stream,
81 Self: Sized,
82 {
83 assert_pipe(FlatMap::new(self, f))
84 }
85
86 #[inline(always)]
87 fn flatten(self) -> Flatten<Self, Self::Output>
88 where
89 Self::Output: Stream,
90 Self: Sized,
91 {
92 assert_pipe(Flatten::new(self))
93 }
94
95 #[inline(always)]
96 fn map<F, R>(self, f: F) -> Map<Self, F>
97 where
98 F: FnMut(Self::Output) -> R,
99 Self: Sized,
100 {
101 assert_pipe(Map::new(self, f))
102 }
103}
104
105pub trait Sink<Item> {
106 type Done;
107
108 fn poll_forward(
112 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
113 ) -> Poll<Self::Done>;
114
115 #[inline(always)]
116 fn send(&mut self, item: Item) -> Send<'_, Self, Item>
117 where
118 Self: Unpin,
119 {
120 assert_future(Send::new(self, Poll::Ready(item)))
121 }
122
123 #[inline(always)]
124 fn send_all<'a, S: ?Sized>(&'a mut self, items: &'a mut S) -> SendAll<'a, Self, S>
125 where
126 S: Stream<Item = Item> + Unpin,
127 Self: Unpin,
128 {
129 assert_future(SendAll::new(self, items))
130 }
131
132 #[inline(always)]
133 fn done(&mut self) -> Done<'_, Self, Item>
134 where
135 Self: Unpin,
136 {
137 assert_future(Done::new(self))
138 }
139}
140
141#[inline(always)]
142fn assert_stream<S>(s: S) -> S
143where
144 S: Stream,
145{
146 s
147}
148#[inline(always)]
149fn assert_pipe<P, Input>(p: P) -> P
150where
151 P: Pipe<Input>,
152{
153 p
154}
155#[inline(always)]
156fn assert_sink<S, Item>(s: S) -> S
157where
158 S: Sink<Item>,
159{
160 s
161}
162#[inline(always)]
163fn assert_future<F>(f: F) -> F
164where
165 F: Future,
166{
167 f
168}
169
170#[pin_project]
171pub struct StreamPipe<S, P> {
172 #[pin]
173 stream: S,
174 #[pin]
175 pipe: P,
176}
177
178impl<S, P> Stream for StreamPipe<S, P>
179where
180 S: Stream,
181 P: Pipe<S::Item>,
182{
183 type Item = P::Output;
184
185 #[inline(always)]
186 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
187 let self_ = self.project();
188 self_.pipe.poll_next(cx, self_.stream)
189 }
190}
191
192#[pin_project]
193pub struct PipePipe<A, B> {
194 #[pin]
195 a: A,
196 #[pin]
197 b: B,
198}
199
200impl<A, B, Input> Pipe<Input> for PipePipe<A, B>
201where
202 A: Pipe<Input>,
203 B: Pipe<A::Output>,
204{
205 type Output = B::Output;
206
207 #[inline(always)]
208 fn poll_next(
209 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
210 ) -> Poll<Option<Self::Output>> {
211 let self_ = self.project();
212 let stream = stream.pipe(self_.a);
213 pin_mut!(stream);
214 self_.b.poll_next(cx, stream)
215 }
216}
217
218#[pin_project]
219pub struct PipeSink<P, S> {
220 #[pin]
221 pipe: P,
222 #[pin]
223 sink: S,
224}
225
226impl<P, S, Item> Sink<Item> for PipeSink<P, S>
227where
228 P: Pipe<Item>,
229 S: Sink<P::Output>,
230{
231 type Done = S::Done;
232
233 #[inline(always)]
234 fn poll_forward(
235 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
236 ) -> Poll<Self::Done> {
237 let self_ = self.project();
238 let stream = stream.pipe(self_.pipe);
239 pin_mut!(stream);
240 self_.sink.poll_forward(cx, stream)
241 }
242}
243
244#[pin_project]
245pub struct StreamSink<A, B> {
246 #[pin]
247 stream: A,
248 #[pin]
249 sink: B,
250}
251
252impl<A, B> Future for StreamSink<A, B>
253where
254 A: Stream,
255 B: Sink<A::Item>,
256{
257 type Output = B::Done;
258
259 #[inline(always)]
260 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
261 let self_ = self.project();
262 self_.sink.poll_forward(cx, self_.stream)
263 }
264}
265
266#[pin_project]
267#[derive(new)]
268pub struct Send<'a, S: ?Sized, Item> {
269 sink: &'a mut S,
270 item: Poll<Item>,
271}
272impl<S: ?Sized + Sink<Item> + Unpin, Item> Future for Send<'_, S, Item> {
273 type Output = Option<S::Done>;
274
275 #[inline(always)]
276 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
277 let self_ = self.project();
278 let item = self_.item;
279 let stream = stream::poll_fn(|_| mem::replace(item, Poll::Pending).map(Some));
280 pin_mut!(stream);
281 if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) {
282 return Poll::Ready(Some(done));
283 }
284 if item.is_pending() {
285 Poll::Ready(None)
286 } else {
287 Poll::Pending
288 }
289 }
290}
291
292#[pin_project]
293#[derive(new)]
294pub struct SendAll<'a, S: ?Sized, St: ?Sized> {
295 sink: &'a mut S,
296 items: &'a mut St,
297}
298impl<S: ?Sized + Sink<Item> + Unpin, St: ?Sized + Stream<Item = Item> + Unpin, Item> Future
299 for SendAll<'_, S, St>
300{
301 type Output = Option<S::Done>;
302
303 #[inline(always)]
304 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
305 let self_ = self.project();
306 let items = &mut **self_.items;
307 let mut given_all = false;
308 let stream = stream::poll_fn(|cx| match Pin::new(&mut *items).poll_next(cx) {
309 x @ Poll::Ready(Some(_)) | x @ Poll::Pending => x,
310 Poll::Ready(None) => {
311 given_all = true;
312 Poll::Pending
313 }
314 });
315 pin_mut!(stream);
316 if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) {
317 return Poll::Ready(Some(done));
318 }
319 if given_all {
320 Poll::Ready(None)
321 } else {
322 Poll::Pending
323 }
324 }
325}
326
327#[pin_project]
328#[derive(new)]
329pub struct Done<'a, S: ?Sized, Item: ?Sized> {
330 sink: &'a mut S,
331 marker: PhantomData<fn() -> Item>,
332}
333impl<S: ?Sized + Sink<Item> + Unpin, Item> Future for Done<'_, S, Item> {
334 type Output = S::Done;
335
336 #[inline(always)]
337 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
338 let stream = stream::empty();
339 pin_mut!(stream);
340 Pin::new(&mut self.sink).poll_forward(cx, stream)
341 }
342}
343
344impl<P, Input> Pipe<Input> for Pin<P>
345where
346 P: DerefMut + Unpin,
347 P::Target: Pipe<Input>,
348{
349 type Output = <P::Target as Pipe<Input>>::Output;
350
351 #[inline(always)]
352 fn poll_next(
353 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
354 ) -> Poll<Option<Self::Output>> {
355 self.get_mut().as_mut().poll_next(cx, stream)
356 }
357}
358
359impl<T: ?Sized, Input> Pipe<Input> for &mut T
360where
361 T: Pipe<Input> + Unpin,
362{
363 type Output = T::Output;
364
365 #[inline(always)]
366 fn poll_next(
367 mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
368 ) -> Poll<Option<Self::Output>> {
369 Pin::new(&mut **self).poll_next(cx, stream)
370 }
371}
372
373impl<P, Item> Sink<Item> for Pin<P>
374where
375 P: DerefMut + Unpin,
376 P::Target: Sink<Item>,
377{
378 type Done = <P::Target as Sink<Item>>::Done;
379
380 #[inline(always)]
381 fn poll_forward(
382 self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
383 ) -> Poll<Self::Done> {
384 self.get_mut().as_mut().poll_forward(cx, stream)
385 }
386}
387
388impl<T: ?Sized, Item> Sink<Item> for &mut T
389where
390 T: Sink<Item> + Unpin,
391{
392 type Done = T::Done;
393
394 #[inline(always)]
395 fn poll_forward(
396 mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
397 ) -> Poll<Self::Done> {
398 Pin::new(&mut **self).poll_forward(cx, stream)
399 }
400}