ntex_util/services/
inflight.rs

1//! Service that limits number of in-flight async requests.
2use ntex_service::{Middleware, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6/// InFlight - service factory for service that can limit number of in-flight
7/// async requests.
8///
9/// Default number of in-flight requests is 15
10#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12    max_inflight: usize,
13}
14
15impl InFlight {
16    pub fn new(max: usize) -> Self {
17        Self { max_inflight: max }
18    }
19}
20
21impl Default for InFlight {
22    fn default() -> Self {
23        Self::new(15)
24    }
25}
26
27impl<S, C> Middleware<S, C> for InFlight {
28    type Service = InFlightService<S>;
29
30    fn create(&self, service: S, _: C) -> Self::Service {
31        InFlightService {
32            service,
33            count: Counter::new(self.max_inflight),
34        }
35    }
36}
37
38#[derive(Debug)]
39pub struct InFlightService<S> {
40    count: Counter,
41    service: S,
42}
43
44impl<S> InFlightService<S> {
45    pub fn new<R>(max: usize, service: S) -> Self
46    where
47        S: Service<R>,
48    {
49        Self {
50            service,
51            count: Counter::new(max),
52        }
53    }
54}
55
56impl<T, R> Service<R> for InFlightService<T>
57where
58    T: Service<R>,
59{
60    type Response = T::Response;
61    type Error = T::Error;
62
63    #[inline]
64    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
65        if !self.count.is_available() {
66            crate::future::join(self.count.available(), ctx.ready(&self.service))
67                .await
68                .1
69        } else {
70            ctx.ready(&self.service).await
71        }
72    }
73
74    #[inline]
75    async fn call(
76        &self,
77        req: R,
78        ctx: ServiceCtx<'_, Self>,
79    ) -> Result<Self::Response, Self::Error> {
80        ctx.ready(self).await?;
81        let _guard = self.count.get();
82        ctx.call(&self.service, req).await
83    }
84
85    ntex_service::forward_poll!(service);
86    ntex_service::forward_shutdown!(service);
87}
88
89#[cfg(test)]
90mod tests {
91    use std::{cell::Cell, cell::RefCell, rc::Rc, task::Poll, time::Duration};
92
93    use async_channel as mpmc;
94    use ntex_service::{Pipeline, ServiceFactory, apply, fn_factory};
95
96    use super::*;
97    use crate::{channel::oneshot, future::lazy};
98
99    struct SleepService(mpmc::Receiver<()>);
100
101    impl Service<()> for SleepService {
102        type Response = ();
103        type Error = ();
104
105        async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
106            let _ = self.0.recv().await;
107            Ok(())
108        }
109    }
110
111    #[ntex::test]
112    async fn test_service() {
113        let (tx, rx) = mpmc::unbounded();
114        let counter = Rc::new(Cell::new(0));
115
116        let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
117        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
118
119        let counter2 = counter.clone();
120        let fut = srv.call_nowait(());
121        ntex::rt::spawn(async move {
122            let _ = fut.await;
123            counter2.set(counter2.get() + 1);
124        });
125        crate::time::sleep(Duration::from_millis(25)).await;
126        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
127
128        let counter2 = counter.clone();
129        let fut = srv.call_nowait(());
130        ntex::rt::spawn(async move {
131            let _ = fut.await;
132            counter2.set(counter2.get() + 1);
133        });
134        crate::time::sleep(Duration::from_millis(25)).await;
135        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
136
137        let counter2 = counter.clone();
138        let fut = srv.call(());
139        let (stx, srx) = oneshot::channel::<()>();
140        ntex::rt::spawn(async move {
141            let _ = fut.await;
142            counter2.set(counter2.get() + 1);
143            let _ = stx.send(());
144        });
145        crate::time::sleep(Duration::from_millis(25)).await;
146        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
147
148        let _ = tx.send(()).await;
149        crate::time::sleep(Duration::from_millis(25)).await;
150        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
151
152        let _ = tx.send(()).await;
153        crate::time::sleep(Duration::from_millis(25)).await;
154        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
155
156        let _ = tx.send(()).await;
157        let _ = srx.recv().await;
158        assert_eq!(counter.get(), 3);
159        srv.shutdown().await;
160    }
161
162    #[ntex::test]
163    async fn test_middleware() {
164        assert_eq!(InFlight::default().max_inflight, 15);
165        assert_eq!(
166            format!("{:?}", InFlight::new(1)),
167            "InFlight { max_inflight: 1 }"
168        );
169
170        let (tx, rx) = mpmc::unbounded();
171        let rx = RefCell::new(Some(rx));
172        let srv = apply(
173            InFlight::new(1),
174            fn_factory(move || {
175                let rx = rx.borrow_mut().take().unwrap();
176                async move { Ok::<_, ()>(SleepService(rx)) }
177            }),
178        );
179
180        let srv = srv.pipeline(&()).await.unwrap().bind();
181        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
182
183        let srv2 = srv.clone();
184        ntex::rt::spawn(async move {
185            let _ = srv2.call(()).await;
186        });
187        crate::time::sleep(Duration::from_millis(25)).await;
188        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
189
190        let _ = tx.send(()).await;
191        crate::time::sleep(Duration::from_millis(25)).await;
192        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193    }
194
195    #[ntex::test]
196    async fn test_middleware2() {
197        assert_eq!(InFlight::default().max_inflight, 15);
198        assert_eq!(
199            format!("{:?}", InFlight::new(1)),
200            "InFlight { max_inflight: 1 }"
201        );
202
203        let (tx, rx) = mpmc::unbounded();
204        let rx = RefCell::new(Some(rx));
205        let srv = apply(
206            InFlight::new(1),
207            fn_factory(move || {
208                let rx = rx.borrow_mut().take().unwrap();
209                async move { Ok::<_, ()>(SleepService(rx)) }
210            }),
211        );
212
213        let srv = srv.pipeline(&()).await.unwrap().bind();
214        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
215
216        let srv2 = srv.clone();
217        ntex::rt::spawn(async move {
218            let _ = srv2.call(()).await;
219        });
220        crate::time::sleep(Duration::from_millis(25)).await;
221        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
222
223        let _ = tx.send(()).await;
224        crate::time::sleep(Duration::from_millis(25)).await;
225        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
226    }
227}