1use super::{util, Service, ServiceCtx, ServiceFactory};
2
3#[derive(Clone, Debug)]
4pub struct AndThen<A, B> {
9 svc1: A,
10 svc2: B,
11}
12
13impl<A, B> AndThen<A, B> {
14 pub(crate) fn new(svc1: A, svc2: B) -> Self {
16 Self { svc1, svc2 }
17 }
18}
19
20impl<A, B, Req> Service<Req> for AndThen<A, B>
21where
22 A: Service<Req>,
23 B: Service<A::Response, Error = A::Error>,
24{
25 type Response = B::Response;
26 type Error = A::Error;
27
28 #[inline]
29 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
30 util::ready(&self.svc1, &self.svc2, ctx).await
31 }
32
33 #[inline]
34 fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> {
35 self.svc1.poll(cx)?;
36 self.svc2.poll(cx)
37 }
38
39 #[inline]
40 async fn shutdown(&self) {
41 util::shutdown(&self.svc1, &self.svc2).await
42 }
43
44 #[inline]
45 async fn call(
46 &self,
47 req: Req,
48 ctx: ServiceCtx<'_, Self>,
49 ) -> Result<B::Response, A::Error> {
50 let res = ctx.call(&self.svc1, req).await?;
51 ctx.call(&self.svc2, res).await
52 }
53}
54
55#[derive(Debug, Clone)]
56pub struct AndThenFactory<A, B> {
58 svc1: A,
59 svc2: B,
60}
61
62impl<A, B> AndThenFactory<A, B> {
63 pub fn new(svc1: A, svc2: B) -> Self {
65 Self { svc1, svc2 }
66 }
67}
68
69impl<A, B, Req, Cfg> ServiceFactory<Req, Cfg> for AndThenFactory<A, B>
70where
71 A: ServiceFactory<Req, Cfg>,
72 B: ServiceFactory<A::Response, Cfg, Error = A::Error, InitError = A::InitError>,
73 Cfg: Clone,
74{
75 type Response = B::Response;
76 type Error = A::Error;
77
78 type Service = AndThen<A::Service, B::Service>;
79 type InitError = A::InitError;
80
81 #[inline]
82 async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
83 Ok(AndThen {
84 svc1: self.svc1.create(cfg.clone()).await?,
85 svc2: self.svc2.create(cfg).await?,
86 })
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use ntex_util::future::lazy;
93 use std::{cell::Cell, rc::Rc, task::Context};
94
95 use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
96
97 #[derive(Debug, Clone)]
98 struct Srv1(Rc<Cell<usize>>, Rc<Cell<usize>>);
99
100 impl Service<&'static str> for Srv1 {
101 type Response = &'static str;
102 type Error = ();
103
104 async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
105 self.0.set(self.0.get() + 1);
106 Ok(())
107 }
108
109 fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> {
110 self.0.set(self.0.get() + 1);
111 Ok(())
112 }
113
114 async fn call(
115 &self,
116 req: &'static str,
117 _: ServiceCtx<'_, Self>,
118 ) -> Result<Self::Response, ()> {
119 Ok(req)
120 }
121
122 async fn shutdown(&self) {
123 self.1.set(self.1.get() + 1);
124 }
125 }
126
127 #[derive(Debug, Clone)]
128 struct Srv2(Rc<Cell<usize>>, Rc<Cell<usize>>);
129
130 impl Service<&'static str> for Srv2 {
131 type Response = (&'static str, &'static str);
132 type Error = ();
133
134 async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
135 self.0.set(self.0.get() + 1);
136 Ok(())
137 }
138
139 fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> {
140 self.0.set(self.0.get() + 1);
141 Ok(())
142 }
143
144 async fn call(
145 &self,
146 req: &'static str,
147 _: ServiceCtx<'_, Self>,
148 ) -> Result<Self::Response, ()> {
149 Ok((req, "srv2"))
150 }
151
152 async fn shutdown(&self) {
153 self.1.set(self.1.get() + 1);
154 }
155 }
156
157 #[ntex::test]
158 async fn test_ready() {
159 let cnt = Rc::new(Cell::new(0));
160 let cnt_sht = Rc::new(Cell::new(0));
161 let srv = chain(Box::new(Srv1(cnt.clone(), cnt_sht.clone())))
162 .clone()
163 .and_then(crate::boxed::service(Srv2(cnt.clone(), cnt_sht.clone())))
164 .into_pipeline();
165 let res = srv.ready().await;
166 assert_eq!(res, Ok(()));
167 assert_eq!(cnt.get(), 2);
168
169 lazy(|cx| srv.clone().poll(cx)).await.unwrap();
170 assert_eq!(cnt.get(), 4);
171
172 srv.shutdown().await;
173 assert_eq!(cnt_sht.get(), 2);
174
175 assert!(format!("{:?}", srv).contains("AndThen"));
176 }
177
178 #[ntex::test]
179 async fn test_ready2() {
180 let cnt = Rc::new(Cell::new(0));
181 let srv = Box::new(
182 chain(Srv1(cnt.clone(), Rc::new(Cell::new(0))))
183 .and_then(Srv2(cnt.clone(), Rc::new(Cell::new(0)))),
184 )
185 .into_pipeline();
186 let res = srv.ready().await;
187 assert_eq!(res, Ok(()));
188 assert_eq!(cnt.get(), 2);
189 }
190
191 #[ntex::test]
192 async fn test_call() {
193 let cnt = Rc::new(Cell::new(0));
194 let srv = chain(Box::new(Srv1(cnt.clone(), Rc::new(Cell::new(0)))))
195 .and_then(Srv2(cnt, Rc::new(Cell::new(0))))
196 .into_pipeline();
197 let res = srv.call("srv1").await;
198 assert!(res.is_ok());
199 assert_eq!(res.unwrap(), ("srv1", "srv2"));
200 }
201
202 #[ntex::test]
203 async fn test_factory() {
204 let cnt = Rc::new(Cell::new(0));
205 let cnt2 = cnt.clone();
206 let new_srv = chain_factory(fn_factory(move || {
207 let cnt = cnt2.clone();
208 async move { Ok::<_, ()>(Srv1(cnt, Rc::new(Cell::new(0)))) }
209 }))
210 .and_then(fn_factory(move || {
211 let cnt = cnt.clone();
212 async move { Ok(Srv2(cnt.clone(), Rc::new(Cell::new(0)))) }
213 }))
214 .clone();
215
216 let srv = new_srv.pipeline(&()).await.unwrap();
217 let res = srv.call("srv1").await;
218 assert!(res.is_ok());
219 assert_eq!(res.unwrap(), ("srv1", "srv2"));
220 }
221}