amadeus_core/par_stream/
flat_map.rs

1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use serde_closure::traits::FnMut;
6use std::{
7	pin::Pin, task::{Context, Poll}
8};
9
10use super::{ParallelPipe, ParallelStream, PipeTask, StreamTask};
11
12#[pin_project]
13#[derive(new)]
14#[must_use]
15pub struct FlatMap<P, F> {
16	#[pin]
17	pipe: P,
18	f: F,
19}
20
21impl_par_dist! {
22	impl<P: ParallelStream, F, R: Stream> ParallelStream for FlatMap<P, F>
23	where
24		F: FnMut<(P::Item,), Output = R> + Clone + Send + 'static,
25	{
26		type Item = R::Item;
27		type Task = FlatMapTask<P::Task, F>;
28
29		fn size_hint(&self) -> (usize, Option<usize>) {
30			(0, None)
31		}
32		fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
33			let self_ = self.project();
34			let f = self_.f;
35			self_.pipe.next_task(cx).map(|task| {
36				task.map(|task| {
37					let f = f.clone();
38					FlatMapTask { task, f }
39				})
40			})
41		}
42	}
43
44	impl<P: ParallelPipe<Input>, F, R: Stream, Input> ParallelPipe<Input> for FlatMap<P, F>
45	where
46		F: FnMut<(P::Output,), Output = R> + Clone + Send + 'static,
47	{
48		type Output = R::Item;
49		type Task = FlatMapTask<P::Task, F>;
50
51		fn task(&self) -> Self::Task {
52			let task = self.pipe.task();
53			let f = self.f.clone();
54			FlatMapTask { task, f }
55		}
56	}
57}
58
59#[derive(Serialize, Deserialize)]
60pub struct FlatMapTask<C, F> {
61	task: C,
62	f: F,
63}
64impl<C: StreamTask, F: FnMut<(C::Item,), Output = R> + Clone, R: Stream> StreamTask
65	for FlatMapTask<C, F>
66{
67	type Item = R::Item;
68	type Async = crate::pipe::FlatMap<C::Async, F, R>;
69
70	fn into_async(self) -> Self::Async {
71		crate::pipe::FlatMap::new(self.task.into_async(), self.f)
72	}
73}
74impl<C: PipeTask<Input>, F: FnMut<(C::Output,), Output = R> + Clone, R: Stream, Input>
75	PipeTask<Input> for FlatMapTask<C, F>
76{
77	type Output = R::Item;
78	type Async = crate::pipe::FlatMap<C::Async, F, R>;
79
80	fn into_async(self) -> Self::Async {
81		crate::pipe::FlatMap::new(self.task.into_async(), self.f)
82	}
83}