amadeus_core/
pipe.rs

1mod filter;
2mod filter_map_sync;
3mod flat_map;
4mod flat_map_sync;
5mod flatten;
6mod map;
7
8use derive_new::new;
9use futures::{pin_mut, stream, Future, Stream};
10use pin_project::pin_project;
11use std::{
12	marker::PhantomData, mem, ops::DerefMut, pin::Pin, task::{Context, Poll}
13};
14
15pub use self::{filter::*, filter_map_sync::*, flat_map::*, flat_map_sync::*, flatten::*, map::*};
16
17// Sink takes Input as an input parameter rather than associated type to accept
18// for<'a> &'a T, but this might not be necessary in future?
19// https://github.com/rust-lang/rust/issues/49601
20
21pub trait StreamExt: Stream {
22	#[inline(always)]
23	fn pipe<P>(self, pipe: P) -> StreamPipe<Self, P>
24	where
25		P: Pipe<Self::Item>,
26		Self: Sized,
27	{
28		assert_stream(StreamPipe { stream: self, pipe })
29	}
30
31	#[inline(always)]
32	fn sink<S>(self, sink: S) -> StreamSink<Self, S>
33	where
34		S: Sink<Self::Item>,
35		Self: Sized,
36	{
37		assert_future(StreamSink { stream: self, sink })
38	}
39}
40impl<S: ?Sized> StreamExt for S where S: Stream {}
41
42pub trait Pipe<Input> {
43	type Output;
44
45	fn poll_next(
46		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
47	) -> Poll<Option<Self::Output>>;
48
49	#[inline(always)]
50	fn pipe<P>(self, pipe: P) -> PipePipe<Self, P>
51	where
52		P: Pipe<Self::Output>,
53		Self: Sized,
54	{
55		assert_pipe(PipePipe { a: self, b: pipe })
56	}
57
58	#[inline(always)]
59	fn sink<S>(self, sink: S) -> PipeSink<Self, S>
60	where
61		S: Sink<Self::Output>,
62		Self: Sized,
63	{
64		assert_sink(PipeSink { pipe: self, sink })
65	}
66
67	#[inline(always)]
68	fn filter<F>(self, f: F) -> Filter<Self, F>
69	where
70		F: FnMut(&Self::Output) -> bool,
71		Self: Sized,
72	{
73		assert_pipe(Filter::new(self, f))
74	}
75
76	#[inline(always)]
77	fn flat_map<F, R>(self, f: F) -> FlatMap<Self, F, R>
78	where
79		F: FnMut(Self::Output) -> R,
80		R: Stream,
81		Self: Sized,
82	{
83		assert_pipe(FlatMap::new(self, f))
84	}
85
86	#[inline(always)]
87	fn flatten(self) -> Flatten<Self, Self::Output>
88	where
89		Self::Output: Stream,
90		Self: Sized,
91	{
92		assert_pipe(Flatten::new(self))
93	}
94
95	#[inline(always)]
96	fn map<F, R>(self, f: F) -> Map<Self, F>
97	where
98		F: FnMut(Self::Output) -> R,
99		Self: Sized,
100	{
101		assert_pipe(Map::new(self, f))
102	}
103}
104
105pub trait Sink<Item> {
106	type Done;
107
108	/// Returns `Poll::Ready` when a) it can't accept any more elements from `stream` and b) all
109	/// accepted elements have been fully processed. By convention, `stream` yielding `None`
110	/// typically triggers (a).
111	fn poll_forward(
112		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
113	) -> Poll<Self::Done>;
114
115	#[inline(always)]
116	fn send(&mut self, item: Item) -> Send<'_, Self, Item>
117	where
118		Self: Unpin,
119	{
120		assert_future(Send::new(self, Poll::Ready(item)))
121	}
122
123	#[inline(always)]
124	fn send_all<'a, S: ?Sized>(&'a mut self, items: &'a mut S) -> SendAll<'a, Self, S>
125	where
126		S: Stream<Item = Item> + Unpin,
127		Self: Unpin,
128	{
129		assert_future(SendAll::new(self, items))
130	}
131
132	#[inline(always)]
133	fn done(&mut self) -> Done<'_, Self, Item>
134	where
135		Self: Unpin,
136	{
137		assert_future(Done::new(self))
138	}
139}
140
141#[inline(always)]
142fn assert_stream<S>(s: S) -> S
143where
144	S: Stream,
145{
146	s
147}
148#[inline(always)]
149fn assert_pipe<P, Input>(p: P) -> P
150where
151	P: Pipe<Input>,
152{
153	p
154}
155#[inline(always)]
156fn assert_sink<S, Item>(s: S) -> S
157where
158	S: Sink<Item>,
159{
160	s
161}
162#[inline(always)]
163fn assert_future<F>(f: F) -> F
164where
165	F: Future,
166{
167	f
168}
169
170#[pin_project]
171pub struct StreamPipe<S, P> {
172	#[pin]
173	stream: S,
174	#[pin]
175	pipe: P,
176}
177
178impl<S, P> Stream for StreamPipe<S, P>
179where
180	S: Stream,
181	P: Pipe<S::Item>,
182{
183	type Item = P::Output;
184
185	#[inline(always)]
186	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
187		let self_ = self.project();
188		self_.pipe.poll_next(cx, self_.stream)
189	}
190}
191
192#[pin_project]
193pub struct PipePipe<A, B> {
194	#[pin]
195	a: A,
196	#[pin]
197	b: B,
198}
199
200impl<A, B, Input> Pipe<Input> for PipePipe<A, B>
201where
202	A: Pipe<Input>,
203	B: Pipe<A::Output>,
204{
205	type Output = B::Output;
206
207	#[inline(always)]
208	fn poll_next(
209		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
210	) -> Poll<Option<Self::Output>> {
211		let self_ = self.project();
212		let stream = stream.pipe(self_.a);
213		pin_mut!(stream);
214		self_.b.poll_next(cx, stream)
215	}
216}
217
218#[pin_project]
219pub struct PipeSink<P, S> {
220	#[pin]
221	pipe: P,
222	#[pin]
223	sink: S,
224}
225
226impl<P, S, Item> Sink<Item> for PipeSink<P, S>
227where
228	P: Pipe<Item>,
229	S: Sink<P::Output>,
230{
231	type Done = S::Done;
232
233	#[inline(always)]
234	fn poll_forward(
235		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
236	) -> Poll<Self::Done> {
237		let self_ = self.project();
238		let stream = stream.pipe(self_.pipe);
239		pin_mut!(stream);
240		self_.sink.poll_forward(cx, stream)
241	}
242}
243
244#[pin_project]
245pub struct StreamSink<A, B> {
246	#[pin]
247	stream: A,
248	#[pin]
249	sink: B,
250}
251
252impl<A, B> Future for StreamSink<A, B>
253where
254	A: Stream,
255	B: Sink<A::Item>,
256{
257	type Output = B::Done;
258
259	#[inline(always)]
260	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
261		let self_ = self.project();
262		self_.sink.poll_forward(cx, self_.stream)
263	}
264}
265
266#[pin_project]
267#[derive(new)]
268pub struct Send<'a, S: ?Sized, Item> {
269	sink: &'a mut S,
270	item: Poll<Item>,
271}
272impl<S: ?Sized + Sink<Item> + Unpin, Item> Future for Send<'_, S, Item> {
273	type Output = Option<S::Done>;
274
275	#[inline(always)]
276	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
277		let self_ = self.project();
278		let item = self_.item;
279		let stream = stream::poll_fn(|_| mem::replace(item, Poll::Pending).map(Some));
280		pin_mut!(stream);
281		if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) {
282			return Poll::Ready(Some(done));
283		}
284		if item.is_pending() {
285			Poll::Ready(None)
286		} else {
287			Poll::Pending
288		}
289	}
290}
291
292#[pin_project]
293#[derive(new)]
294pub struct SendAll<'a, S: ?Sized, St: ?Sized> {
295	sink: &'a mut S,
296	items: &'a mut St,
297}
298impl<S: ?Sized + Sink<Item> + Unpin, St: ?Sized + Stream<Item = Item> + Unpin, Item> Future
299	for SendAll<'_, S, St>
300{
301	type Output = Option<S::Done>;
302
303	#[inline(always)]
304	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
305		let self_ = self.project();
306		let items = &mut **self_.items;
307		let mut given_all = false;
308		let stream = stream::poll_fn(|cx| match Pin::new(&mut *items).poll_next(cx) {
309			x @ Poll::Ready(Some(_)) | x @ Poll::Pending => x,
310			Poll::Ready(None) => {
311				given_all = true;
312				Poll::Pending
313			}
314		});
315		pin_mut!(stream);
316		if let Poll::Ready(done) = Pin::new(self_.sink).poll_forward(cx, stream) {
317			return Poll::Ready(Some(done));
318		}
319		if given_all {
320			Poll::Ready(None)
321		} else {
322			Poll::Pending
323		}
324	}
325}
326
327#[pin_project]
328#[derive(new)]
329pub struct Done<'a, S: ?Sized, Item: ?Sized> {
330	sink: &'a mut S,
331	marker: PhantomData<fn() -> Item>,
332}
333impl<S: ?Sized + Sink<Item> + Unpin, Item> Future for Done<'_, S, Item> {
334	type Output = S::Done;
335
336	#[inline(always)]
337	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
338		let stream = stream::empty();
339		pin_mut!(stream);
340		Pin::new(&mut self.sink).poll_forward(cx, stream)
341	}
342}
343
344impl<P, Input> Pipe<Input> for Pin<P>
345where
346	P: DerefMut + Unpin,
347	P::Target: Pipe<Input>,
348{
349	type Output = <P::Target as Pipe<Input>>::Output;
350
351	#[inline(always)]
352	fn poll_next(
353		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
354	) -> Poll<Option<Self::Output>> {
355		self.get_mut().as_mut().poll_next(cx, stream)
356	}
357}
358
359impl<T: ?Sized, Input> Pipe<Input> for &mut T
360where
361	T: Pipe<Input> + Unpin,
362{
363	type Output = T::Output;
364
365	#[inline(always)]
366	fn poll_next(
367		mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Input>>,
368	) -> Poll<Option<Self::Output>> {
369		Pin::new(&mut **self).poll_next(cx, stream)
370	}
371}
372
373impl<P, Item> Sink<Item> for Pin<P>
374where
375	P: DerefMut + Unpin,
376	P::Target: Sink<Item>,
377{
378	type Done = <P::Target as Sink<Item>>::Done;
379
380	#[inline(always)]
381	fn poll_forward(
382		self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
383	) -> Poll<Self::Done> {
384		self.get_mut().as_mut().poll_forward(cx, stream)
385	}
386}
387
388impl<T: ?Sized, Item> Sink<Item> for &mut T
389where
390	T: Sink<Item> + Unpin,
391{
392	type Done = T::Done;
393
394	#[inline(always)]
395	fn poll_forward(
396		mut self: Pin<&mut Self>, cx: &mut Context, stream: Pin<&mut impl Stream<Item = Item>>,
397	) -> Poll<Self::Done> {
398		Pin::new(&mut **self).poll_forward(cx, stream)
399	}
400}