ntex_util/services/
inflight.rs

1//! Service that limits number of in-flight async requests.
2use ntex_service::{Middleware, Middleware2, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6/// InFlight - service factory for service that can limit number of in-flight
7/// async requests.
8///
9/// Default number of in-flight requests is 15
10#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12    max_inflight: usize,
13}
14
15impl InFlight {
16    pub fn new(max: usize) -> Self {
17        Self { max_inflight: max }
18    }
19}
20
21impl Default for InFlight {
22    fn default() -> Self {
23        Self::new(15)
24    }
25}
26
27impl<S> Middleware<S> for InFlight {
28    type Service = InFlightService<S>;
29
30    fn create(&self, service: S) -> Self::Service {
31        InFlightService {
32            service,
33            count: Counter::new(self.max_inflight),
34        }
35    }
36}
37
38impl<S, C> Middleware2<S, C> for InFlight {
39    type Service = InFlightService<S>;
40
41    fn create(&self, service: S, _: C) -> Self::Service {
42        InFlightService {
43            service,
44            count: Counter::new(self.max_inflight),
45        }
46    }
47}
48
49#[derive(Debug)]
50pub struct InFlightService<S> {
51    count: Counter,
52    service: S,
53}
54
55impl<S> InFlightService<S> {
56    pub fn new<R>(max: usize, service: S) -> Self
57    where
58        S: Service<R>,
59    {
60        Self {
61            service,
62            count: Counter::new(max),
63        }
64    }
65}
66
67impl<T, R> Service<R> for InFlightService<T>
68where
69    T: Service<R>,
70{
71    type Response = T::Response;
72    type Error = T::Error;
73
74    #[inline]
75    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
76        if !self.count.is_available() {
77            let (_, res) =
78                crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
79            res
80        } else {
81            ctx.ready(&self.service).await
82        }
83    }
84
85    #[inline]
86    async fn call(
87        &self,
88        req: R,
89        ctx: ServiceCtx<'_, Self>,
90    ) -> Result<Self::Response, Self::Error> {
91        let _guard = self.count.get();
92        ctx.call(&self.service, req).await
93    }
94
95    ntex_service::forward_poll!(service);
96    ntex_service::forward_shutdown!(service);
97}
98
99#[cfg(test)]
100mod tests {
101    use std::{cell::RefCell, task::Poll, time::Duration};
102
103    use ntex_service::{Pipeline, ServiceFactory, apply, apply2, fn_factory};
104
105    use super::*;
106    use crate::{channel::oneshot, future::lazy};
107
108    struct SleepService(oneshot::Receiver<()>);
109
110    impl Service<()> for SleepService {
111        type Response = ();
112        type Error = ();
113
114        async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
115            let _ = self.0.recv().await;
116            Ok(())
117        }
118    }
119
120    #[ntex::test]
121    async fn test_service() {
122        let (tx, rx) = oneshot::channel();
123
124        let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
125        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
126
127        let srv2 = srv.clone();
128        ntex::rt::spawn(async move {
129            let _ = srv2.call(()).await;
130        });
131        crate::time::sleep(Duration::from_millis(25)).await;
132        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
133
134        let _ = tx.send(());
135        crate::time::sleep(Duration::from_millis(25)).await;
136        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
137        srv.shutdown().await;
138    }
139
140    #[ntex::test]
141    async fn test_middleware() {
142        assert_eq!(InFlight::default().max_inflight, 15);
143        assert_eq!(
144            format!("{:?}", InFlight::new(1)),
145            "InFlight { max_inflight: 1 }"
146        );
147
148        let (tx, rx) = oneshot::channel();
149        let rx = RefCell::new(Some(rx));
150        let srv = apply(
151            InFlight::new(1),
152            fn_factory(move || {
153                let rx = rx.borrow_mut().take().unwrap();
154                async move { Ok::<_, ()>(SleepService(rx)) }
155            }),
156        );
157
158        let srv = srv.pipeline(&()).await.unwrap().bind();
159        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
160
161        let srv2 = srv.clone();
162        ntex::rt::spawn(async move {
163            let _ = srv2.call(()).await;
164        });
165        crate::time::sleep(Duration::from_millis(25)).await;
166        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
167
168        let _ = tx.send(());
169        crate::time::sleep(Duration::from_millis(25)).await;
170        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
171    }
172
173    #[ntex::test]
174    async fn test_middleware2() {
175        assert_eq!(InFlight::default().max_inflight, 15);
176        assert_eq!(
177            format!("{:?}", InFlight::new(1)),
178            "InFlight { max_inflight: 1 }"
179        );
180
181        let (tx, rx) = oneshot::channel();
182        let rx = RefCell::new(Some(rx));
183        let srv = apply2(
184            InFlight::new(1),
185            fn_factory(move || {
186                let rx = rx.borrow_mut().take().unwrap();
187                async move { Ok::<_, ()>(SleepService(rx)) }
188            }),
189        );
190
191        let srv = srv.pipeline(&()).await.unwrap().bind();
192        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193
194        let srv2 = srv.clone();
195        ntex::rt::spawn(async move {
196            let _ = srv2.call(()).await;
197        });
198        crate::time::sleep(Duration::from_millis(25)).await;
199        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
200
201        let _ = tx.send(());
202        crate::time::sleep(Duration::from_millis(25)).await;
203        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
204    }
205}