requiem_service/
pipeline.rs

1use std::future::Future;
2use std::task::{Context, Poll};
3
4use crate::and_then::{AndThenService, AndThenServiceFactory};
5use crate::and_then_apply_fn::{AndThenApplyFn, AndThenApplyFnFactory};
6use crate::map::{Map, MapServiceFactory};
7use crate::map_err::{MapErr, MapErrServiceFactory};
8use crate::map_init_err::MapInitErr;
9use crate::then::{ThenService, ThenServiceFactory};
10use crate::{IntoService, IntoServiceFactory, Service, ServiceFactory};
11
12/// Contruct new pipeline with one service in pipeline chain.
13pub fn pipeline<F, T>(service: F) -> Pipeline<T>
14where
15    F: IntoService<T>,
16    T: Service,
17{
18    Pipeline {
19        service: service.into_service(),
20    }
21}
22
23/// Contruct new pipeline factory with one service factory.
24pub fn pipeline_factory<T, F>(factory: F) -> PipelineFactory<T>
25where
26    T: ServiceFactory,
27    F: IntoServiceFactory<T>,
28{
29    PipelineFactory {
30        factory: factory.into_factory(),
31    }
32}
33
34/// Pipeline service - pipeline allows to compose multiple service into one service.
35pub struct Pipeline<T> {
36    service: T,
37}
38
39impl<T: Service> Pipeline<T> {
40    /// Call another service after call to this one has resolved successfully.
41    ///
42    /// This function can be used to chain two services together and ensure that
43    /// the second service isn't called until call to the fist service have
44    /// finished. Result of the call to the first service is used as an
45    /// input parameter for the second service's call.
46    ///
47    /// Note that this function consumes the receiving service and returns a
48    /// wrapped version of it.
49    pub fn and_then<F, U>(
50        self,
51        service: F,
52    ) -> Pipeline<
53        impl Service<Request = T::Request, Response = U::Response, Error = T::Error> + Clone,
54    >
55    where
56        Self: Sized,
57        F: IntoService<U>,
58        U: Service<Request = T::Response, Error = T::Error>,
59    {
60        Pipeline {
61            service: AndThenService::new(self.service, service.into_service()),
62        }
63    }
64
65    /// Apply function to specified service and use it as a next service in
66    /// chain.
67    ///
68    /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))`
69    pub fn and_then_apply_fn<U, I, F, Fut, Res, Err>(
70        self,
71        service: I,
72        f: F,
73    ) -> Pipeline<impl Service<Request = T::Request, Response = Res, Error = Err> + Clone>
74    where
75        Self: Sized,
76        I: IntoService<U>,
77        U: Service,
78        F: FnMut(T::Response, &mut U) -> Fut,
79        Fut: Future<Output = Result<Res, Err>>,
80        Err: From<T::Error> + From<U::Error>,
81    {
82        Pipeline {
83            service: AndThenApplyFn::new(self.service, service.into_service(), f),
84        }
85    }
86
87    /// Chain on a computation for when a call to the service finished,
88    /// passing the result of the call to the next service `U`.
89    ///
90    /// Note that this function consumes the receiving pipeline and returns a
91    /// wrapped version of it.
92    pub fn then<F, U>(
93        self,
94        service: F,
95    ) -> Pipeline<
96        impl Service<Request = T::Request, Response = U::Response, Error = T::Error> + Clone,
97    >
98    where
99        Self: Sized,
100        F: IntoService<U>,
101        U: Service<Request = Result<T::Response, T::Error>, Error = T::Error>,
102    {
103        Pipeline {
104            service: ThenService::new(self.service, service.into_service()),
105        }
106    }
107
108    /// Map this service's output to a different type, returning a new service
109    /// of the resulting type.
110    ///
111    /// This function is similar to the `Option::map` or `Iterator::map` where
112    /// it will change the type of the underlying service.
113    ///
114    /// Note that this function consumes the receiving service and returns a
115    /// wrapped version of it, similar to the existing `map` methods in the
116    /// standard library.
117    pub fn map<F, R>(self, f: F) -> Pipeline<Map<T, F, R>>
118    where
119        Self: Sized,
120        F: FnMut(T::Response) -> R,
121    {
122        Pipeline {
123            service: Map::new(self.service, f),
124        }
125    }
126
127    /// Map this service's error to a different error, returning a new service.
128    ///
129    /// This function is similar to the `Result::map_err` where it will change
130    /// the error type of the underlying service. This is useful for example to
131    /// ensure that services have the same error type.
132    ///
133    /// Note that this function consumes the receiving service and returns a
134    /// wrapped version of it.
135    pub fn map_err<F, E>(self, f: F) -> Pipeline<MapErr<T, F, E>>
136    where
137        Self: Sized,
138        F: Fn(T::Error) -> E,
139    {
140        Pipeline {
141            service: MapErr::new(self.service, f),
142        }
143    }
144}
145
146impl<T> Clone for Pipeline<T>
147where
148    T: Clone,
149{
150    fn clone(&self) -> Self {
151        Pipeline {
152            service: self.service.clone(),
153        }
154    }
155}
156
157impl<T: Service> Service for Pipeline<T> {
158    type Request = T::Request;
159    type Response = T::Response;
160    type Error = T::Error;
161    type Future = T::Future;
162
163    #[inline]
164    fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), T::Error>> {
165        self.service.poll_ready(ctx)
166    }
167
168    #[inline]
169    fn call(&mut self, req: T::Request) -> Self::Future {
170        self.service.call(req)
171    }
172}
173
174/// Pipeline factory
175pub struct PipelineFactory<T> {
176    factory: T,
177}
178
179impl<T: ServiceFactory> PipelineFactory<T> {
180    /// Call another service after call to this one has resolved successfully.
181    pub fn and_then<F, U>(
182        self,
183        factory: F,
184    ) -> PipelineFactory<
185        impl ServiceFactory<
186                Request = T::Request,
187                Response = U::Response,
188                Error = T::Error,
189                Config = T::Config,
190                InitError = T::InitError,
191                Service = impl Service<
192                    Request = T::Request,
193                    Response = U::Response,
194                    Error = T::Error,
195                > + Clone,
196            > + Clone,
197    >
198    where
199        Self: Sized,
200        T::Config: Clone,
201        F: IntoServiceFactory<U>,
202        U: ServiceFactory<
203            Config = T::Config,
204            Request = T::Response,
205            Error = T::Error,
206            InitError = T::InitError,
207        >,
208    {
209        PipelineFactory {
210            factory: AndThenServiceFactory::new(self.factory, factory.into_factory()),
211        }
212    }
213
214    /// Apply function to specified service and use it as a next service in
215    /// chain.
216    ///
217    /// Short version of `pipeline_factory(...).and_then(apply_fn_factory(...))`
218    pub fn and_then_apply_fn<U, I, F, Fut, Res, Err>(
219        self,
220        factory: I,
221        f: F,
222    ) -> PipelineFactory<
223        impl ServiceFactory<
224                Request = T::Request,
225                Response = Res,
226                Error = Err,
227                Config = T::Config,
228                InitError = T::InitError,
229                Service = impl Service<Request = T::Request, Response = Res, Error = Err> + Clone,
230            > + Clone,
231    >
232    where
233        Self: Sized,
234        T::Config: Clone,
235        I: IntoServiceFactory<U>,
236        U: ServiceFactory<Config = T::Config, InitError = T::InitError>,
237        F: FnMut(T::Response, &mut U::Service) -> Fut + Clone,
238        Fut: Future<Output = Result<Res, Err>>,
239        Err: From<T::Error> + From<U::Error>,
240    {
241        PipelineFactory {
242            factory: AndThenApplyFnFactory::new(self.factory, factory.into_factory(), f),
243        }
244    }
245
246    /// Create `NewService` to chain on a computation for when a call to the
247    /// service finished, passing the result of the call to the next
248    /// service `U`.
249    ///
250    /// Note that this function consumes the receiving pipeline and returns a
251    /// wrapped version of it.
252    pub fn then<F, U>(
253        self,
254        factory: F,
255    ) -> PipelineFactory<
256        impl ServiceFactory<
257                Request = T::Request,
258                Response = U::Response,
259                Error = T::Error,
260                Config = T::Config,
261                InitError = T::InitError,
262                Service = impl Service<
263                    Request = T::Request,
264                    Response = U::Response,
265                    Error = T::Error,
266                > + Clone,
267            > + Clone,
268    >
269    where
270        Self: Sized,
271        T::Config: Clone,
272        F: IntoServiceFactory<U>,
273        U: ServiceFactory<
274            Config = T::Config,
275            Request = Result<T::Response, T::Error>,
276            Error = T::Error,
277            InitError = T::InitError,
278        >,
279    {
280        PipelineFactory {
281            factory: ThenServiceFactory::new(self.factory, factory.into_factory()),
282        }
283    }
284
285    /// Map this service's output to a different type, returning a new service
286    /// of the resulting type.
287    pub fn map<F, R>(self, f: F) -> PipelineFactory<MapServiceFactory<T, F, R>>
288    where
289        Self: Sized,
290        F: FnMut(T::Response) -> R + Clone,
291    {
292        PipelineFactory {
293            factory: MapServiceFactory::new(self.factory, f),
294        }
295    }
296
297    /// Map this service's error to a different error, returning a new service.
298    pub fn map_err<F, E>(self, f: F) -> PipelineFactory<MapErrServiceFactory<T, F, E>>
299    where
300        Self: Sized,
301        F: Fn(T::Error) -> E + Clone,
302    {
303        PipelineFactory {
304            factory: MapErrServiceFactory::new(self.factory, f),
305        }
306    }
307
308    /// Map this factory's init error to a different error, returning a new service.
309    pub fn map_init_err<F, E>(self, f: F) -> PipelineFactory<MapInitErr<T, F, E>>
310    where
311        Self: Sized,
312        F: Fn(T::InitError) -> E + Clone,
313    {
314        PipelineFactory {
315            factory: MapInitErr::new(self.factory, f),
316        }
317    }
318}
319
320impl<T> Clone for PipelineFactory<T>
321where
322    T: Clone,
323{
324    fn clone(&self) -> Self {
325        PipelineFactory {
326            factory: self.factory.clone(),
327        }
328    }
329}
330
331impl<T: ServiceFactory> ServiceFactory for PipelineFactory<T> {
332    type Config = T::Config;
333    type Request = T::Request;
334    type Response = T::Response;
335    type Error = T::Error;
336    type Service = T::Service;
337    type InitError = T::InitError;
338    type Future = T::Future;
339
340    #[inline]
341    fn new_service(&self, cfg: T::Config) -> Self::Future {
342        self.factory.new_service(cfg)
343    }
344}