ntex_util/services/
inflight.rs

1//! Service that limits number of in-flight async requests.
2use ntex_service::{Middleware, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6/// InFlight - service factory for service that can limit number of in-flight
7/// async requests.
8///
9/// Default number of in-flight requests is 15
10#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12    max_inflight: usize,
13}
14
15impl InFlight {
16    pub fn new(max: usize) -> Self {
17        Self { max_inflight: max }
18    }
19}
20
21impl Default for InFlight {
22    fn default() -> Self {
23        Self::new(15)
24    }
25}
26
27impl<S> Middleware<S> for InFlight {
28    type Service = InFlightService<S>;
29
30    fn create(&self, service: S) -> Self::Service {
31        InFlightService {
32            service,
33            count: Counter::new(self.max_inflight),
34        }
35    }
36}
37
38#[derive(Debug)]
39pub struct InFlightService<S> {
40    count: Counter,
41    service: S,
42}
43
44impl<S> InFlightService<S> {
45    pub fn new<R>(max: usize, service: S) -> Self
46    where
47        S: Service<R>,
48    {
49        Self {
50            service,
51            count: Counter::new(max),
52        }
53    }
54}
55
56impl<T, R> Service<R> for InFlightService<T>
57where
58    T: Service<R>,
59{
60    type Response = T::Response;
61    type Error = T::Error;
62
63    #[inline]
64    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
65        if !self.count.is_available() {
66            let (_, res) =
67                crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
68            res
69        } else {
70            ctx.ready(&self.service).await
71        }
72    }
73
74    #[inline]
75    async fn call(
76        &self,
77        req: R,
78        ctx: ServiceCtx<'_, Self>,
79    ) -> Result<Self::Response, Self::Error> {
80        let _guard = self.count.get();
81        ctx.call(&self.service, req).await
82    }
83
84    ntex_service::forward_poll!(service);
85    ntex_service::forward_shutdown!(service);
86}
87
88#[cfg(test)]
89mod tests {
90    use std::{cell::RefCell, task::Poll, time::Duration};
91
92    use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
93
94    use super::*;
95    use crate::{channel::oneshot, future::lazy};
96
97    struct SleepService(oneshot::Receiver<()>);
98
99    impl Service<()> for SleepService {
100        type Response = ();
101        type Error = ();
102
103        async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
104            let _ = self.0.recv().await;
105            Ok(())
106        }
107    }
108
109    #[ntex_macros::rt_test2]
110    async fn test_service() {
111        let (tx, rx) = oneshot::channel();
112
113        let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
114        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
115
116        let srv2 = srv.clone();
117        ntex::rt::spawn(async move {
118            let _ = srv2.call(()).await;
119        });
120        crate::time::sleep(Duration::from_millis(25)).await;
121        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
122
123        let _ = tx.send(());
124        crate::time::sleep(Duration::from_millis(25)).await;
125        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
126        srv.shutdown().await;
127    }
128
129    #[ntex_macros::rt_test2]
130    async fn test_middleware() {
131        assert_eq!(InFlight::default().max_inflight, 15);
132        assert_eq!(
133            format!("{:?}", InFlight::new(1)),
134            "InFlight { max_inflight: 1 }"
135        );
136
137        let (tx, rx) = oneshot::channel();
138        let rx = RefCell::new(Some(rx));
139        let srv = apply(
140            InFlight::new(1),
141            fn_factory(move || {
142                let rx = rx.borrow_mut().take().unwrap();
143                async move { Ok::<_, ()>(SleepService(rx)) }
144            }),
145        );
146
147        let srv = srv.pipeline(&()).await.unwrap().bind();
148        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
149
150        let srv2 = srv.clone();
151        ntex::rt::spawn(async move {
152            let _ = srv2.call(()).await;
153        });
154        crate::time::sleep(Duration::from_millis(25)).await;
155        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
156
157        let _ = tx.send(());
158        crate::time::sleep(Duration::from_millis(25)).await;
159        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
160    }
161}