ntex_util/services/
inflight.rs

1//! Service that limits number of in-flight async requests.
2use ntex_service::{Middleware, Middleware2, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6/// InFlight - service factory for service that can limit number of in-flight
7/// async requests.
8///
9/// Default number of in-flight requests is 15
10#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12    max_inflight: usize,
13}
14
15impl InFlight {
16    pub fn new(max: usize) -> Self {
17        Self { max_inflight: max }
18    }
19}
20
21impl Default for InFlight {
22    fn default() -> Self {
23        Self::new(15)
24    }
25}
26
27impl<S> Middleware<S> for InFlight {
28    type Service = InFlightService<S>;
29
30    fn create(&self, service: S) -> Self::Service {
31        InFlightService {
32            service,
33            count: Counter::new(self.max_inflight),
34        }
35    }
36}
37
38impl<S, C> Middleware2<S, C> for InFlight {
39    type Service = InFlightService<S>;
40
41    fn create(&self, service: S, _: C) -> Self::Service {
42        InFlightService {
43            service,
44            count: Counter::new(self.max_inflight),
45        }
46    }
47}
48
49#[derive(Debug)]
50pub struct InFlightService<S> {
51    count: Counter,
52    service: S,
53}
54
55impl<S> InFlightService<S> {
56    pub fn new<R>(max: usize, service: S) -> Self
57    where
58        S: Service<R>,
59    {
60        Self {
61            service,
62            count: Counter::new(max),
63        }
64    }
65}
66
67impl<T, R> Service<R> for InFlightService<T>
68where
69    T: Service<R>,
70{
71    type Response = T::Response;
72    type Error = T::Error;
73
74    #[inline]
75    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
76        if !self.count.is_available() {
77            crate::future::join(self.count.available(), ctx.ready(&self.service))
78                .await
79                .1
80        } else {
81            ctx.ready(&self.service).await
82        }
83    }
84
85    #[inline]
86    async fn call(
87        &self,
88        req: R,
89        ctx: ServiceCtx<'_, Self>,
90    ) -> Result<Self::Response, Self::Error> {
91        ctx.ready(self).await?;
92        let _guard = self.count.get();
93        ctx.call(&self.service, req).await
94    }
95
96    ntex_service::forward_poll!(service);
97    ntex_service::forward_shutdown!(service);
98}
99
100#[cfg(test)]
101mod tests {
102    use std::{cell::Cell, cell::RefCell, rc::Rc, task::Poll, time::Duration};
103
104    use async_channel as mpmc;
105    use ntex_service::{Pipeline, ServiceFactory, apply, apply2, fn_factory};
106
107    use super::*;
108    use crate::{channel::oneshot, future::lazy};
109
110    struct SleepService(mpmc::Receiver<()>);
111
112    impl Service<()> for SleepService {
113        type Response = ();
114        type Error = ();
115
116        async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
117            let _ = self.0.recv().await;
118            Ok(())
119        }
120    }
121
122    #[ntex::test]
123    async fn test_service() {
124        let (tx, rx) = mpmc::unbounded();
125        let counter = Rc::new(Cell::new(0));
126
127        let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
128        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
129
130        let counter2 = counter.clone();
131        let fut = srv.call_nowait(());
132        ntex::rt::spawn(async move {
133            let _ = fut.await;
134            counter2.set(counter2.get() + 1);
135        });
136        crate::time::sleep(Duration::from_millis(25)).await;
137        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
138
139        let counter2 = counter.clone();
140        let fut = srv.call_nowait(());
141        ntex::rt::spawn(async move {
142            let _ = fut.await;
143            counter2.set(counter2.get() + 1);
144        });
145        crate::time::sleep(Duration::from_millis(25)).await;
146        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
147
148        let counter2 = counter.clone();
149        let fut = srv.call(());
150        let (stx, srx) = oneshot::channel::<()>();
151        ntex::rt::spawn(async move {
152            let _ = fut.await;
153            counter2.set(counter2.get() + 1);
154            let _ = stx.send(());
155        });
156        crate::time::sleep(Duration::from_millis(25)).await;
157        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
158
159        let _ = tx.send(()).await;
160        crate::time::sleep(Duration::from_millis(25)).await;
161        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
162
163        let _ = tx.send(()).await;
164        crate::time::sleep(Duration::from_millis(25)).await;
165        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
166
167        let _ = tx.send(()).await;
168        let _ = srx.recv().await;
169        assert_eq!(counter.get(), 3);
170        srv.shutdown().await;
171    }
172
173    #[ntex::test]
174    async fn test_middleware() {
175        assert_eq!(InFlight::default().max_inflight, 15);
176        assert_eq!(
177            format!("{:?}", InFlight::new(1)),
178            "InFlight { max_inflight: 1 }"
179        );
180
181        let (tx, rx) = mpmc::unbounded();
182        let rx = RefCell::new(Some(rx));
183        let srv = apply(
184            InFlight::new(1),
185            fn_factory(move || {
186                let rx = rx.borrow_mut().take().unwrap();
187                async move { Ok::<_, ()>(SleepService(rx)) }
188            }),
189        );
190
191        let srv = srv.pipeline(&()).await.unwrap().bind();
192        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193
194        let srv2 = srv.clone();
195        ntex::rt::spawn(async move {
196            let _ = srv2.call(()).await;
197        });
198        crate::time::sleep(Duration::from_millis(25)).await;
199        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
200
201        let _ = tx.send(()).await;
202        crate::time::sleep(Duration::from_millis(25)).await;
203        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
204    }
205
206    #[ntex::test]
207    async fn test_middleware2() {
208        assert_eq!(InFlight::default().max_inflight, 15);
209        assert_eq!(
210            format!("{:?}", InFlight::new(1)),
211            "InFlight { max_inflight: 1 }"
212        );
213
214        let (tx, rx) = mpmc::unbounded();
215        let rx = RefCell::new(Some(rx));
216        let srv = apply2(
217            InFlight::new(1),
218            fn_factory(move || {
219                let rx = rx.borrow_mut().take().unwrap();
220                async move { Ok::<_, ()>(SleepService(rx)) }
221            }),
222        );
223
224        let srv = srv.pipeline(&()).await.unwrap().bind();
225        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
226
227        let srv2 = srv.clone();
228        ntex::rt::spawn(async move {
229            let _ = srv2.call(()).await;
230        });
231        crate::time::sleep(Duration::from_millis(25)).await;
232        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
233
234        let _ = tx.send(()).await;
235        crate::time::sleep(Duration::from_millis(25)).await;
236        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
237    }
238}