par_stream/builder/
future_factory.rs

1use crate::common::*;
2
3pub type BoxFutureFactory<'a, In, Out> = Box<dyn 'a + FnMut(In) -> BoxFuture<'static, Out> + Send>;
4
5pub use future_factory_::*;
6mod future_factory_ {
7    use super::*;
8
9    pub trait FutureFactory<In>
10    where
11        In: 'static + Send,
12        Self::Fut: 'static + Send + Future,
13        <Self::Fut as Future>::Output: 'static + Send,
14    {
15        type Fut;
16
17        fn generate(&mut self, input: In) -> Self::Fut;
18
19        fn boxed<'a>(mut self) -> BoxFutureFactory<'a, In, <Self::Fut as Future>::Output>
20        where
21            Self: 'a + Sized + Send,
22        {
23            Box::new(move |input: In| self.generate(input).boxed())
24        }
25
26        fn compose<G>(self, other: G) -> ComposeFutureFactory<In, Self, G>
27        where
28            Self: Sized,
29            G: 'static + Send + Clone + FutureFactory<<Self::Fut as Future>::Output>,
30            <G::Fut as Future>::Output: Send,
31        {
32            ComposeFutureFactory::new(self, other)
33        }
34    }
35
36    impl<F, In, Fut> FutureFactory<In> for F
37    where
38        F: FnMut(In) -> Fut,
39        In: 'static + Send,
40        Fut: 'static + Send + Future,
41        Fut::Output: 'static + Send,
42    {
43        type Fut = Fut;
44
45        fn generate(&mut self, input: In) -> Self::Fut {
46            self(input)
47        }
48    }
49}
50
51pub use compose_future_factory::*;
52mod compose_future_factory {
53    use super::*;
54
55    pub struct ComposeFutureFactory<In, F, G>
56    where
57        F: FutureFactory<In>,
58        G: FutureFactory<<F::Fut as Future>::Output>,
59        In: 'static + Send,
60        <F::Fut as Future>::Output: 'static + Send,
61        <G::Fut as Future>::Output: 'static + Send,
62        F::Fut: 'static + Send + Future,
63        G::Fut: 'static + Send + Future,
64    {
65        f: F,
66        g: G,
67        _phantom: PhantomData<In>,
68    }
69
70    impl<In, F, G> ComposeFutureFactory<In, F, G>
71    where
72        F: FutureFactory<In>,
73        G: FutureFactory<<F::Fut as Future>::Output>,
74        In: 'static + Send,
75        <F::Fut as Future>::Output: 'static + Send,
76        <G::Fut as Future>::Output: 'static + Send,
77        F::Fut: 'static + Send + Future,
78        G::Fut: 'static + Send + Future,
79    {
80        pub fn new(f: F, g: G) -> Self {
81            Self {
82                f,
83                g,
84                _phantom: PhantomData,
85            }
86        }
87    }
88
89    impl<In, F, G> FutureFactory<In> for ComposeFutureFactory<In, F, G>
90    where
91        F: FutureFactory<In>,
92        G: 'static + Send + Clone + FutureFactory<<F::Fut as Future>::Output>,
93        In: 'static + Send,
94        <F::Fut as Future>::Output: 'static + Send,
95        <G::Fut as Future>::Output: 'static + Send,
96        F::Fut: 'static + Send + Future,
97        G::Fut: 'static + Send + Future,
98    {
99        type Fut = FutFacChain<F::Fut, G>;
100
101        fn generate(&mut self, input: In) -> FutFacChain<F::Fut, G> {
102            FutFacChain::new(self.f.generate(input), self.g.clone())
103        }
104    }
105
106    impl<In, F, G> Clone for ComposeFutureFactory<In, F, G>
107    where
108        F: Clone + FutureFactory<In>,
109        G: Send + Clone + FutureFactory<<F::Fut as Future>::Output>,
110        In: 'static + Send,
111        <F::Fut as Future>::Output: 'static + Send,
112        <G::Fut as Future>::Output: 'static + Send,
113        F::Fut: 'static + Send + Future,
114        G::Fut: 'static + Send + Future,
115    {
116        fn clone(&self) -> Self {
117            Self {
118                f: self.f.clone(),
119                g: self.g.clone(),
120                _phantom: PhantomData,
121            }
122        }
123    }
124}
125
126pub use fut_fac_chain::*;
127mod fut_fac_chain {
128    use super::*;
129
130    #[pin_project]
131    pub struct FutFacChain<Fut, Fac>
132    where
133        Fut::Output: 'static + Send,
134        <Fac::Fut as Future>::Output: 'static + Send,
135        Fut: Future,
136        Fac::Fut: 'static + Send + Future,
137        Fac: FutureFactory<Fut::Output>,
138    {
139        #[pin]
140        ffut: Option<Fut>,
141        #[pin]
142        gfut: Option<Fac::Fut>,
143        fac: Fac,
144    }
145
146    impl<Fut, Fac> FutFacChain<Fut, Fac>
147    where
148        Fut::Output: 'static + Send,
149        <Fac::Fut as Future>::Output: 'static + Send,
150        Fut: Future,
151        Fac::Fut: 'static + Send + Future,
152        Fac: FutureFactory<Fut::Output>,
153    {
154        pub fn new(fut: Fut, fac: Fac) -> Self {
155            Self {
156                ffut: Some(fut),
157                gfut: None,
158                fac,
159            }
160        }
161    }
162
163    impl<Fut, Fac> Future for FutFacChain<Fut, Fac>
164    where
165        Fut::Output: 'static + Send,
166        <Fac::Fut as Future>::Output: 'static + Send,
167        Fut: Future,
168        Fac::Fut: 'static + Send + Future,
169        Fac: FutureFactory<Fut::Output>,
170    {
171        type Output = <Fac::Fut as Future>::Output;
172
173        fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
174            let mut this = self.project();
175
176            Ready(loop {
177                if let Some(ffut) = this.ffut.as_mut().as_pin_mut() {
178                    let fout = ready!(ffut.poll(cx));
179                    this.ffut.set(None);
180                    let gfut = this.fac.generate(fout);
181                    this.gfut.set(Some(gfut))
182                } else if let Some(gfut) = this.gfut.as_mut().as_pin_mut() {
183                    let gout = ready!(gfut.poll(cx));
184                    this.gfut.set(None);
185                    break gout;
186                } else {
187                    unreachable!()
188                }
189            })
190        }
191    }
192}