ntex_service/
and_then.rs

1use super::{util, Service, ServiceCtx, ServiceFactory};
2
3#[derive(Clone, Debug)]
4/// Service for the `and_then` combinator, chaining a computation onto the end
5/// of another service which completes successfully.
6///
7/// This is created by the `ServiceExt::and_then` method.
8pub struct AndThen<A, B> {
9    svc1: A,
10    svc2: B,
11}
12
13impl<A, B> AndThen<A, B> {
14    /// Create new `AndThen` combinator
15    pub(crate) fn new(svc1: A, svc2: B) -> Self {
16        Self { svc1, svc2 }
17    }
18}
19
20impl<A, B, Req> Service<Req> for AndThen<A, B>
21where
22    A: Service<Req>,
23    B: Service<A::Response, Error = A::Error>,
24{
25    type Response = B::Response;
26    type Error = A::Error;
27
28    #[inline]
29    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
30        util::ready(&self.svc1, &self.svc2, ctx).await
31    }
32
33    #[inline]
34    fn poll(&self, cx: &mut std::task::Context<'_>) -> Result<(), Self::Error> {
35        self.svc1.poll(cx)?;
36        self.svc2.poll(cx)
37    }
38
39    #[inline]
40    async fn shutdown(&self) {
41        util::shutdown(&self.svc1, &self.svc2).await
42    }
43
44    #[inline]
45    async fn call(
46        &self,
47        req: Req,
48        ctx: ServiceCtx<'_, Self>,
49    ) -> Result<B::Response, A::Error> {
50        let res = ctx.call(&self.svc1, req).await?;
51        ctx.call(&self.svc2, res).await
52    }
53}
54
55#[derive(Debug, Clone)]
56/// `.and_then()` service factory combinator
57pub struct AndThenFactory<A, B> {
58    svc1: A,
59    svc2: B,
60}
61
62impl<A, B> AndThenFactory<A, B> {
63    /// Create new `AndThenFactory` combinator
64    pub fn new(svc1: A, svc2: B) -> Self {
65        Self { svc1, svc2 }
66    }
67}
68
69impl<A, B, Req, Cfg> ServiceFactory<Req, Cfg> for AndThenFactory<A, B>
70where
71    A: ServiceFactory<Req, Cfg>,
72    B: ServiceFactory<A::Response, Cfg, Error = A::Error, InitError = A::InitError>,
73    Cfg: Clone,
74{
75    type Response = B::Response;
76    type Error = A::Error;
77
78    type Service = AndThen<A::Service, B::Service>;
79    type InitError = A::InitError;
80
81    #[inline]
82    async fn create(&self, cfg: Cfg) -> Result<Self::Service, Self::InitError> {
83        Ok(AndThen {
84            svc1: self.svc1.create(cfg.clone()).await?,
85            svc2: self.svc2.create(cfg).await?,
86        })
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use ntex_util::future::lazy;
93    use std::{cell::Cell, rc::Rc, task::Context};
94
95    use crate::{chain, chain_factory, fn_factory, Service, ServiceCtx};
96
97    #[derive(Debug, Clone)]
98    struct Srv1(Rc<Cell<usize>>, Rc<Cell<usize>>);
99
100    impl Service<&'static str> for Srv1 {
101        type Response = &'static str;
102        type Error = ();
103
104        async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
105            self.0.set(self.0.get() + 1);
106            Ok(())
107        }
108
109        fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> {
110            self.0.set(self.0.get() + 1);
111            Ok(())
112        }
113
114        async fn call(
115            &self,
116            req: &'static str,
117            _: ServiceCtx<'_, Self>,
118        ) -> Result<Self::Response, ()> {
119            Ok(req)
120        }
121
122        async fn shutdown(&self) {
123            self.1.set(self.1.get() + 1);
124        }
125    }
126
127    #[derive(Debug, Clone)]
128    struct Srv2(Rc<Cell<usize>>, Rc<Cell<usize>>);
129
130    impl Service<&'static str> for Srv2 {
131        type Response = (&'static str, &'static str);
132        type Error = ();
133
134        async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
135            self.0.set(self.0.get() + 1);
136            Ok(())
137        }
138
139        fn poll(&self, _: &mut Context<'_>) -> Result<(), Self::Error> {
140            self.0.set(self.0.get() + 1);
141            Ok(())
142        }
143
144        async fn call(
145            &self,
146            req: &'static str,
147            _: ServiceCtx<'_, Self>,
148        ) -> Result<Self::Response, ()> {
149            Ok((req, "srv2"))
150        }
151
152        async fn shutdown(&self) {
153            self.1.set(self.1.get() + 1);
154        }
155    }
156
157    #[ntex::test]
158    async fn test_ready() {
159        let cnt = Rc::new(Cell::new(0));
160        let cnt_sht = Rc::new(Cell::new(0));
161        let srv = chain(Box::new(Srv1(cnt.clone(), cnt_sht.clone())))
162            .clone()
163            .and_then(crate::boxed::service(Srv2(cnt.clone(), cnt_sht.clone())))
164            .into_pipeline();
165        let res = srv.ready().await;
166        assert_eq!(res, Ok(()));
167        assert_eq!(cnt.get(), 2);
168
169        lazy(|cx| srv.clone().poll(cx)).await.unwrap();
170        assert_eq!(cnt.get(), 4);
171
172        srv.shutdown().await;
173        assert_eq!(cnt_sht.get(), 2);
174
175        assert!(format!("{:?}", srv).contains("AndThen"));
176    }
177
178    #[ntex::test]
179    async fn test_ready2() {
180        let cnt = Rc::new(Cell::new(0));
181        let srv = Box::new(
182            chain(Srv1(cnt.clone(), Rc::new(Cell::new(0))))
183                .and_then(Srv2(cnt.clone(), Rc::new(Cell::new(0)))),
184        )
185        .into_pipeline();
186        let res = srv.ready().await;
187        assert_eq!(res, Ok(()));
188        assert_eq!(cnt.get(), 2);
189    }
190
191    #[ntex::test]
192    async fn test_call() {
193        let cnt = Rc::new(Cell::new(0));
194        let srv = chain(Box::new(Srv1(cnt.clone(), Rc::new(Cell::new(0)))))
195            .and_then(Srv2(cnt, Rc::new(Cell::new(0))))
196            .into_pipeline();
197        let res = srv.call("srv1").await;
198        assert!(res.is_ok());
199        assert_eq!(res.unwrap(), ("srv1", "srv2"));
200    }
201
202    #[ntex::test]
203    async fn test_factory() {
204        let cnt = Rc::new(Cell::new(0));
205        let cnt2 = cnt.clone();
206        let new_srv = chain_factory(fn_factory(move || {
207            let cnt = cnt2.clone();
208            async move { Ok::<_, ()>(Srv1(cnt, Rc::new(Cell::new(0)))) }
209        }))
210        .and_then(fn_factory(move || {
211            let cnt = cnt.clone();
212            async move { Ok(Srv2(cnt.clone(), Rc::new(Cell::new(0)))) }
213        }))
214        .clone();
215
216        let srv = new_srv.pipeline(&()).await.unwrap();
217        let res = srv.call("srv1").await;
218        assert!(res.is_ok());
219        assert_eq!(res.unwrap(), ("srv1", "srv2"));
220    }
221}