amadeus_core/par_stream/
flat_map.rs1use derive_new::new;
2use futures::Stream;
3use pin_project::pin_project;
4use serde::{Deserialize, Serialize};
5use serde_closure::traits::FnMut;
6use std::{
7 pin::Pin, task::{Context, Poll}
8};
9
10use super::{ParallelPipe, ParallelStream, PipeTask, StreamTask};
11
12#[pin_project]
13#[derive(new)]
14#[must_use]
15pub struct FlatMap<P, F> {
16 #[pin]
17 pipe: P,
18 f: F,
19}
20
21impl_par_dist! {
22 impl<P: ParallelStream, F, R: Stream> ParallelStream for FlatMap<P, F>
23 where
24 F: FnMut<(P::Item,), Output = R> + Clone + Send + 'static,
25 {
26 type Item = R::Item;
27 type Task = FlatMapTask<P::Task, F>;
28
29 fn size_hint(&self) -> (usize, Option<usize>) {
30 (0, None)
31 }
32 fn next_task(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Task>> {
33 let self_ = self.project();
34 let f = self_.f;
35 self_.pipe.next_task(cx).map(|task| {
36 task.map(|task| {
37 let f = f.clone();
38 FlatMapTask { task, f }
39 })
40 })
41 }
42 }
43
44 impl<P: ParallelPipe<Input>, F, R: Stream, Input> ParallelPipe<Input> for FlatMap<P, F>
45 where
46 F: FnMut<(P::Output,), Output = R> + Clone + Send + 'static,
47 {
48 type Output = R::Item;
49 type Task = FlatMapTask<P::Task, F>;
50
51 fn task(&self) -> Self::Task {
52 let task = self.pipe.task();
53 let f = self.f.clone();
54 FlatMapTask { task, f }
55 }
56 }
57}
58
59#[derive(Serialize, Deserialize)]
60pub struct FlatMapTask<C, F> {
61 task: C,
62 f: F,
63}
64impl<C: StreamTask, F: FnMut<(C::Item,), Output = R> + Clone, R: Stream> StreamTask
65 for FlatMapTask<C, F>
66{
67 type Item = R::Item;
68 type Async = crate::pipe::FlatMap<C::Async, F, R>;
69
70 fn into_async(self) -> Self::Async {
71 crate::pipe::FlatMap::new(self.task.into_async(), self.f)
72 }
73}
74impl<C: PipeTask<Input>, F: FnMut<(C::Output,), Output = R> + Clone, R: Stream, Input>
75 PipeTask<Input> for FlatMapTask<C, F>
76{
77 type Output = R::Item;
78 type Async = crate::pipe::FlatMap<C::Async, F, R>;
79
80 fn into_async(self) -> Self::Async {
81 crate::pipe::FlatMap::new(self.task.into_async(), self.f)
82 }
83}