1use std::future::Future;
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use super::{IntoService, IntoServiceFactory, Service, ServiceFactory};
7
8pub fn apply_fn<T, F, R, In, Out, Err, U>(service: U, f: F) -> Apply<T, F, R, In, Out, Err>
10where
11 T: Service<Error = Err>,
12 F: FnMut(In, &mut T) -> R,
13 R: Future<Output = Result<Out, Err>>,
14 U: IntoService<T>,
15{
16 Apply::new(service.into_service(), f)
17}
18
19pub fn apply_fn_factory<T, F, R, In, Out, Err, U>(
21 service: U,
22 f: F,
23) -> ApplyServiceFactory<T, F, R, In, Out, Err>
24where
25 T: ServiceFactory<Error = Err>,
26 F: FnMut(In, &mut T::Service) -> R + Clone,
27 R: Future<Output = Result<Out, Err>>,
28 U: IntoServiceFactory<T>,
29{
30 ApplyServiceFactory::new(service.into_factory(), f)
31}
32
33pub struct Apply<T, F, R, In, Out, Err>
35where
36 T: Service<Error = Err>,
37{
38 service: T,
39 f: F,
40 r: PhantomData<(In, Out, R)>,
41}
42
43impl<T, F, R, In, Out, Err> Apply<T, F, R, In, Out, Err>
44where
45 T: Service<Error = Err>,
46 F: FnMut(In, &mut T) -> R,
47 R: Future<Output = Result<Out, Err>>,
48{
49 fn new(service: T, f: F) -> Self {
51 Self {
52 service,
53 f,
54 r: PhantomData,
55 }
56 }
57}
58
59impl<T, F, R, In, Out, Err> Clone for Apply<T, F, R, In, Out, Err>
60where
61 T: Service<Error = Err> + Clone,
62 F: FnMut(In, &mut T) -> R + Clone,
63 R: Future<Output = Result<Out, Err>>,
64{
65 fn clone(&self) -> Self {
66 Apply {
67 service: self.service.clone(),
68 f: self.f.clone(),
69 r: PhantomData,
70 }
71 }
72}
73
74impl<T, F, R, In, Out, Err> Service for Apply<T, F, R, In, Out, Err>
75where
76 T: Service<Error = Err>,
77 F: FnMut(In, &mut T) -> R,
78 R: Future<Output = Result<Out, Err>>,
79{
80 type Request = In;
81 type Response = Out;
82 type Error = Err;
83 type Future = R;
84
85 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
86 Poll::Ready(futures_util::ready!(self.service.poll_ready(cx)))
87 }
88
89 fn call(&mut self, req: In) -> Self::Future {
90 (self.f)(req, &mut self.service)
91 }
92}
93
94pub struct ApplyServiceFactory<T, F, R, In, Out, Err>
96where
97 T: ServiceFactory<Error = Err>,
98 F: FnMut(In, &mut T::Service) -> R + Clone,
99 R: Future<Output = Result<Out, Err>>,
100{
101 service: T,
102 f: F,
103 r: PhantomData<(R, In, Out)>,
104}
105
106impl<T, F, R, In, Out, Err> ApplyServiceFactory<T, F, R, In, Out, Err>
107where
108 T: ServiceFactory<Error = Err>,
109 F: FnMut(In, &mut T::Service) -> R + Clone,
110 R: Future<Output = Result<Out, Err>>,
111{
112 fn new(service: T, f: F) -> Self {
114 Self {
115 f,
116 service,
117 r: PhantomData,
118 }
119 }
120}
121
122impl<T, F, R, In, Out, Err> Clone for ApplyServiceFactory<T, F, R, In, Out, Err>
123where
124 T: ServiceFactory<Error = Err> + Clone,
125 F: FnMut(In, &mut T::Service) -> R + Clone,
126 R: Future<Output = Result<Out, Err>>,
127{
128 fn clone(&self) -> Self {
129 Self {
130 service: self.service.clone(),
131 f: self.f.clone(),
132 r: PhantomData,
133 }
134 }
135}
136
137impl<T, F, R, In, Out, Err> ServiceFactory for ApplyServiceFactory<T, F, R, In, Out, Err>
138where
139 T: ServiceFactory<Error = Err>,
140 F: FnMut(In, &mut T::Service) -> R + Clone,
141 R: Future<Output = Result<Out, Err>>,
142{
143 type Request = In;
144 type Response = Out;
145 type Error = Err;
146
147 type Config = T::Config;
148 type Service = Apply<T::Service, F, R, In, Out, Err>;
149 type InitError = T::InitError;
150 type Future = ApplyServiceFactoryResponse<T, F, R, In, Out, Err>;
151
152 fn new_service(&self, cfg: T::Config) -> Self::Future {
153 ApplyServiceFactoryResponse::new(self.service.new_service(cfg), self.f.clone())
154 }
155}
156
157#[pin_project::pin_project]
158pub struct ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
159where
160 T: ServiceFactory<Error = Err>,
161 F: FnMut(In, &mut T::Service) -> R,
162 R: Future<Output = Result<Out, Err>>,
163{
164 #[pin]
165 fut: T::Future,
166 f: Option<F>,
167 r: PhantomData<(In, Out)>,
168}
169
170impl<T, F, R, In, Out, Err> ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
171where
172 T: ServiceFactory<Error = Err>,
173 F: FnMut(In, &mut T::Service) -> R,
174 R: Future<Output = Result<Out, Err>>,
175{
176 fn new(fut: T::Future, f: F) -> Self {
177 Self {
178 f: Some(f),
179 fut,
180 r: PhantomData,
181 }
182 }
183}
184
185impl<T, F, R, In, Out, Err> Future for ApplyServiceFactoryResponse<T, F, R, In, Out, Err>
186where
187 T: ServiceFactory<Error = Err>,
188 F: FnMut(In, &mut T::Service) -> R,
189 R: Future<Output = Result<Out, Err>>,
190{
191 type Output = Result<Apply<T::Service, F, R, In, Out, Err>, T::InitError>;
192
193 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194 let this = self.project();
195
196 if let Poll::Ready(svc) = this.fut.poll(cx)? {
197 Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap())))
198 } else {
199 Poll::Pending
200 }
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use std::task::{Context, Poll};
207
208 use futures_util::future::{lazy, ok, Ready};
209
210 use super::*;
211 use crate::{pipeline, pipeline_factory, Service, ServiceFactory};
212
213 #[derive(Clone)]
214 struct Srv;
215
216 impl Service for Srv {
217 type Request = ();
218 type Response = ();
219 type Error = ();
220 type Future = Ready<Result<(), ()>>;
221
222 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
223 Poll::Ready(Ok(()))
224 }
225
226 fn call(&mut self, _: ()) -> Self::Future {
227 ok(())
228 }
229 }
230
231 #[actix_rt::test]
232 async fn test_call() {
233 let mut srv = pipeline(apply_fn(Srv, |req: &'static str, srv| {
234 let fut = srv.call(());
235 async move {
236 let res = fut.await.unwrap();
237 Ok((req, res))
238 }
239 }));
240
241 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
242
243 let res = srv.call("srv").await;
244 assert!(res.is_ok());
245 assert_eq!(res.unwrap(), (("srv", ())));
246 }
247
248 #[actix_rt::test]
249 async fn test_new_service() {
250 let new_srv = pipeline_factory(apply_fn_factory(
251 || ok::<_, ()>(Srv),
252 |req: &'static str, srv| {
253 let fut = srv.call(());
254 async move {
255 let res = fut.await.unwrap();
256 Ok((req, res))
257 }
258 },
259 ));
260
261 let mut srv = new_srv.new_service(()).await.unwrap();
262
263 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
264
265 let res = srv.call("srv").await;
266 assert!(res.is_ok());
267 assert_eq!(res.unwrap(), (("srv", ())));
268 }
269}