ntex_util/services/
inflight.rs1use ntex_service::{Middleware, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12 max_inflight: usize,
13}
14
15impl InFlight {
16 pub fn new(max: usize) -> Self {
17 Self { max_inflight: max }
18 }
19}
20
21impl Default for InFlight {
22 fn default() -> Self {
23 Self::new(15)
24 }
25}
26
27impl<S, C> Middleware<S, C> for InFlight {
28 type Service = InFlightService<S>;
29
30 fn create(&self, service: S, _: C) -> Self::Service {
31 InFlightService {
32 service,
33 count: Counter::new(self.max_inflight),
34 }
35 }
36}
37
38#[derive(Debug)]
39pub struct InFlightService<S> {
40 count: Counter,
41 service: S,
42}
43
44impl<S> InFlightService<S> {
45 pub fn new<R>(max: usize, service: S) -> Self
46 where
47 S: Service<R>,
48 {
49 Self {
50 service,
51 count: Counter::new(max),
52 }
53 }
54}
55
56impl<T, R> Service<R> for InFlightService<T>
57where
58 T: Service<R>,
59{
60 type Response = T::Response;
61 type Error = T::Error;
62
63 #[inline]
64 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
65 if !self.count.is_available() {
66 crate::future::join(self.count.available(), ctx.ready(&self.service))
67 .await
68 .1
69 } else {
70 ctx.ready(&self.service).await
71 }
72 }
73
74 #[inline]
75 async fn call(
76 &self,
77 req: R,
78 ctx: ServiceCtx<'_, Self>,
79 ) -> Result<Self::Response, Self::Error> {
80 ctx.ready(self).await?;
81 let _guard = self.count.get();
82 ctx.call(&self.service, req).await
83 }
84
85 ntex_service::forward_poll!(service);
86 ntex_service::forward_shutdown!(service);
87}
88
89#[cfg(test)]
90mod tests {
91 use std::{cell::Cell, cell::RefCell, rc::Rc, task::Poll, time::Duration};
92
93 use async_channel as mpmc;
94 use ntex_service::{Pipeline, ServiceFactory, apply, fn_factory};
95
96 use super::*;
97 use crate::{channel::oneshot, future::lazy};
98
99 struct SleepService(mpmc::Receiver<()>);
100
101 impl Service<()> for SleepService {
102 type Response = ();
103 type Error = ();
104
105 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
106 let _ = self.0.recv().await;
107 Ok(())
108 }
109 }
110
111 #[ntex::test]
112 async fn test_service() {
113 let (tx, rx) = mpmc::unbounded();
114 let counter = Rc::new(Cell::new(0));
115
116 let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
117 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
118
119 let counter2 = counter.clone();
120 let fut = srv.call_nowait(());
121 ntex::rt::spawn(async move {
122 let _ = fut.await;
123 counter2.set(counter2.get() + 1);
124 });
125 crate::time::sleep(Duration::from_millis(25)).await;
126 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
127
128 let counter2 = counter.clone();
129 let fut = srv.call_nowait(());
130 ntex::rt::spawn(async move {
131 let _ = fut.await;
132 counter2.set(counter2.get() + 1);
133 });
134 crate::time::sleep(Duration::from_millis(25)).await;
135 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
136
137 let counter2 = counter.clone();
138 let fut = srv.call(());
139 let (stx, srx) = oneshot::channel::<()>();
140 ntex::rt::spawn(async move {
141 let _ = fut.await;
142 counter2.set(counter2.get() + 1);
143 let _ = stx.send(());
144 });
145 crate::time::sleep(Duration::from_millis(25)).await;
146 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
147
148 let _ = tx.send(()).await;
149 crate::time::sleep(Duration::from_millis(25)).await;
150 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
151
152 let _ = tx.send(()).await;
153 crate::time::sleep(Duration::from_millis(25)).await;
154 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
155
156 let _ = tx.send(()).await;
157 let _ = srx.recv().await;
158 assert_eq!(counter.get(), 3);
159 srv.shutdown().await;
160 }
161
162 #[ntex::test]
163 async fn test_middleware() {
164 assert_eq!(InFlight::default().max_inflight, 15);
165 assert_eq!(
166 format!("{:?}", InFlight::new(1)),
167 "InFlight { max_inflight: 1 }"
168 );
169
170 let (tx, rx) = mpmc::unbounded();
171 let rx = RefCell::new(Some(rx));
172 let srv = apply(
173 InFlight::new(1),
174 fn_factory(move || {
175 let rx = rx.borrow_mut().take().unwrap();
176 async move { Ok::<_, ()>(SleepService(rx)) }
177 }),
178 );
179
180 let srv = srv.pipeline(&()).await.unwrap().bind();
181 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
182
183 let srv2 = srv.clone();
184 ntex::rt::spawn(async move {
185 let _ = srv2.call(()).await;
186 });
187 crate::time::sleep(Duration::from_millis(25)).await;
188 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
189
190 let _ = tx.send(()).await;
191 crate::time::sleep(Duration::from_millis(25)).await;
192 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193 }
194
195 #[ntex::test]
196 async fn test_middleware2() {
197 assert_eq!(InFlight::default().max_inflight, 15);
198 assert_eq!(
199 format!("{:?}", InFlight::new(1)),
200 "InFlight { max_inflight: 1 }"
201 );
202
203 let (tx, rx) = mpmc::unbounded();
204 let rx = RefCell::new(Some(rx));
205 let srv = apply(
206 InFlight::new(1),
207 fn_factory(move || {
208 let rx = rx.borrow_mut().take().unwrap();
209 async move { Ok::<_, ()>(SleepService(rx)) }
210 }),
211 );
212
213 let srv = srv.pipeline(&()).await.unwrap().bind();
214 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
215
216 let srv2 = srv.clone();
217 ntex::rt::spawn(async move {
218 let _ = srv2.call(()).await;
219 });
220 crate::time::sleep(Duration::from_millis(25)).await;
221 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
222
223 let _ = tx.send(()).await;
224 crate::time::sleep(Duration::from_millis(25)).await;
225 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
226 }
227}