requiem_service/
apply.rs

1use std::future::Future;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
7
8/// Apply tranform function to a service.
9pub fn apply_fn<T, F, R, In, Out, Err, U>(service: U, f: F) -> Apply<T, F, R, In, Out, Err>
10where
11    T: Service<Error = Err>,
12    F: FnMut(In, &mut T) -> R,
13    R: Future<Output = Result<Out, Err>>,
14    U: IntoService<T>,
15{
16    Apply::new(service.into_service(), f)
17}
18
19/// Service factory that prodices `apply_fn` service.
20pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
21    service: U,
22    f: F,
23) -> ApplyServiceFactory<T, F, R, In, Out, Err>
24where
25    T: ServiceFactory<Error = Err>,
26    F: FnMut(In, &mut T::Service) -> R + Clone,
27    R: Future<Output = Result<Out, Err>>,
28    U: IntoServiceFactory<T>,
29{
30    ApplyServiceFactory::new(service.into_factory(), f)
31}
32
33/// `Apply` service combinator
34pub struct Apply<T, F, R, In, Out, Err>
35where
36    T: Service<Error = Err>,
37{
38    service: T,
39    f: F,
40    r: PhantomData<(In, Out, R)>,
41}
42
43impl<T, F, R, In, Out, Err> Apply<T, F, R, In, Out, Err>
44where
45    T: Service<Error = Err>,
46    F: FnMut(In, &mut T) -> R,
47    R: Future<Output = Result<Out, Err>>,
48{
49    /// Create new `Apply` combinator
50    fn new(service: T, f: F) -> Self {
51        Self {
52            service,
53            f,
54            r: PhantomData,
55        }
56    }
57}
58
59impl<T, F, R, In, Out, Err> Clone for Apply<T, F, R, In, Out, Err>
60where
61    T: Service<Error = Err> + Clone,
62    F: FnMut(In, &mut T) -> R + Clone,
63    R: Future<Output = Result<Out, Err>>,
64{
65    fn clone(&self) -> Self {
66        Apply {
67            service: self.service.clone(),
68            f: self.f.clone(),
69            r: PhantomData,
70        }
71    }
72}
73
74impl<T, F, R, In, Out, Err> Service for Apply<T, F, R, In, Out, Err>
75where
76    T: Service<Error = Err>,
77    F: FnMut(In, &mut T) -> R,
78    R: Future<Output = Result<Out, Err>>,
79{
80    type Request = In;
81    type Response = Out;
82    type Error = Err;
83    type Future = R;
84
85    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86        Poll::Ready(futures_util::ready!(self.service.poll_ready(cx)))
87    }
88
89    fn call(&mut self, req: In) -> Self::Future {
90        (self.f)(req, &mut self.service)
91    }
92}
93
94/// `apply()` service factory
95pub struct ApplyServiceFactory<T, F, R, In, Out, Err>
96where
97    T: ServiceFactory<Error = Err>,
98    F: FnMut(In, &mut T::Service) -> R + Clone,
99    R: Future<Output = Result<Out, Err>>,
100{
101    service: T,
102    f: F,
103    r: PhantomData<(R, In, Out)>,
104}
105
106impl<T, F, R, In, Out, Err> ApplyServiceFactory<T, F, R, In, Out, Err>
107where
108    T: ServiceFactory<Error = Err>,
109    F: FnMut(In, &mut T::Service) -> R + Clone,
110    R: Future<Output = Result<Out, Err>>,
111{
112    /// Create new `ApplyNewService` new service instance
113    fn new(service: T, f: F) -> Self {
114        Self {
115            f,
116            service,
117            r: PhantomData,
118        }
119    }
120}
121
122impl<T, F, R, In, Out, Err> Clone for ApplyServiceFactory<T, F, R, In, Out, Err>
123where
124    T: ServiceFactory<Error = Err> + Clone,
125    F: FnMut(In, &mut T::Service) -> R + Clone,
126    R: Future<Output = Result<Out, Err>>,
127{
128    fn clone(&self) -> Self {
129        Self {
130            service: self.service.clone(),
131            f: self.f.clone(),
132            r: PhantomData,
133        }
134    }
135}
136
137impl<T, F, R, In, Out, Err> ServiceFactory for ApplyServiceFactory<T, F, R, In, Out, Err>
138where
139    T: ServiceFactory<Error = Err>,
140    F: FnMut(In, &mut T::Service) -> R + Clone,
141    R: Future<Output = Result<Out, Err>>,
142{
143    type Request = In;
144    type Response = Out;
145    type Error = Err;
146
147    type Config = T::Config;
148    type Service = Apply<T::Service, F, R, In, Out, Err>;
149    type InitError = T::InitError;
150    type Future = ApplyServiceFactoryResponse<T, F, R, In, Out, Err>;
151
152    fn new_service(&self, cfg: T::Config) -> Self::Future {
153        ApplyServiceFactoryResponse::new(self.service.new_service(cfg), self.f.clone())
154    }
155}
156
157#[pin_project::pin_project]
158pub struct ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
159where
160    T: ServiceFactory<Error = Err>,
161    F: FnMut(In, &mut T::Service) -> R,
162    R: Future<Output = Result<Out, Err>>,
163{
164    #[pin]
165    fut: T::Future,
166    f: Option<F>,
167    r: PhantomData<(In, Out)>,
168}
169
170impl<T, F, R, In, Out, Err> ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
171where
172    T: ServiceFactory<Error = Err>,
173    F: FnMut(In, &mut T::Service) -> R,
174    R: Future<Output = Result<Out, Err>>,
175{
176    fn new(fut: T::Future, f: F) -> Self {
177        Self {
178            f: Some(f),
179            fut,
180            r: PhantomData,
181        }
182    }
183}
184
185impl<T, F, R, In, Out, Err> Future for ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
186where
187    T: ServiceFactory<Error = Err>,
188    F: FnMut(In, &mut T::Service) -> R,
189    R: Future<Output = Result<Out, Err>>,
190{
191    type Output = Result<Apply<T::Service, F, R, In, Out, Err>, T::InitError>;
192
193    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194        let this = self.project();
195
196        if let Poll::Ready(svc) = this.fut.poll(cx)? {
197            Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap())))
198        } else {
199            Poll::Pending
200        }
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use std::task::{Context, Poll};
207
208    use futures_util::future::{lazy, ok, Ready};
209
210    use super::*;
211    use crate::{pipeline, pipeline_factory, Service, ServiceFactory};
212
213    #[derive(Clone)]
214    struct Srv;
215
216    impl Service for Srv {
217        type Request = ();
218        type Response = ();
219        type Error = ();
220        type Future = Ready<Result<(), ()>>;
221
222        fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
223            Poll::Ready(Ok(()))
224        }
225
226        fn call(&mut self, _: ()) -> Self::Future {
227            ok(())
228        }
229    }
230
231    #[actix_rt::test]
232    async fn test_call() {
233        let mut srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
234            let fut = srv.call(());
235            async move {
236                let res = fut.await.unwrap();
237                Ok((req, res))
238            }
239        }));
240
241        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
242
243        let res = srv.call("srv").await;
244        assert!(res.is_ok());
245        assert_eq!(res.unwrap(), (("srv", ())));
246    }
247
248    #[actix_rt::test]
249    async fn test_new_service() {
250        let new_srv = pipeline_factory(apply_fn_factory(
251            || ok::<_, ()>(Srv),
252            |req: &'static str, srv| {
253                let fut = srv.call(());
254                async move {
255                    let res = fut.await.unwrap();
256                    Ok((req, res))
257                }
258            },
259        ));
260
261        let mut srv = new_srv.new_service(()).await.unwrap();
262
263        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
264
265        let res = srv.call("srv").await;
266        assert!(res.is_ok());
267        assert_eq!(res.unwrap(), (("srv", ())));
268    }
269}