ntex_util/services/
inflight.rs1use ntex_service::{Middleware, Middleware2, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12 max_inflight: usize,
13}
14
15impl InFlight {
16 pub fn new(max: usize) -> Self {
17 Self { max_inflight: max }
18 }
19}
20
21impl Default for InFlight {
22 fn default() -> Self {
23 Self::new(15)
24 }
25}
26
27impl<S> Middleware<S> for InFlight {
28 type Service = InFlightService<S>;
29
30 fn create(&self, service: S) -> Self::Service {
31 InFlightService {
32 service,
33 count: Counter::new(self.max_inflight),
34 }
35 }
36}
37
38impl<S, C> Middleware2<S, C> for InFlight {
39 type Service = InFlightService<S>;
40
41 fn create(&self, service: S, _: C) -> Self::Service {
42 InFlightService {
43 service,
44 count: Counter::new(self.max_inflight),
45 }
46 }
47}
48
49#[derive(Debug)]
50pub struct InFlightService<S> {
51 count: Counter,
52 service: S,
53}
54
55impl<S> InFlightService<S> {
56 pub fn new<R>(max: usize, service: S) -> Self
57 where
58 S: Service<R>,
59 {
60 Self {
61 service,
62 count: Counter::new(max),
63 }
64 }
65}
66
67impl<T, R> Service<R> for InFlightService<T>
68where
69 T: Service<R>,
70{
71 type Response = T::Response;
72 type Error = T::Error;
73
74 #[inline]
75 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
76 if !self.count.is_available() {
77 crate::future::join(self.count.available(), ctx.ready(&self.service))
78 .await
79 .1
80 } else {
81 ctx.ready(&self.service).await
82 }
83 }
84
85 #[inline]
86 async fn call(
87 &self,
88 req: R,
89 ctx: ServiceCtx<'_, Self>,
90 ) -> Result<Self::Response, Self::Error> {
91 ctx.ready(self).await?;
92 let _guard = self.count.get();
93 ctx.call(&self.service, req).await
94 }
95
96 ntex_service::forward_poll!(service);
97 ntex_service::forward_shutdown!(service);
98}
99
100#[cfg(test)]
101mod tests {
102 use std::{cell::Cell, cell::RefCell, rc::Rc, task::Poll, time::Duration};
103
104 use async_channel as mpmc;
105 use ntex_service::{Pipeline, ServiceFactory, apply, apply2, fn_factory};
106
107 use super::*;
108 use crate::{channel::oneshot, future::lazy};
109
110 struct SleepService(mpmc::Receiver<()>);
111
112 impl Service<()> for SleepService {
113 type Response = ();
114 type Error = ();
115
116 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
117 let _ = self.0.recv().await;
118 Ok(())
119 }
120 }
121
122 #[ntex::test]
123 async fn test_service() {
124 let (tx, rx) = mpmc::unbounded();
125 let counter = Rc::new(Cell::new(0));
126
127 let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
128 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
129
130 let counter2 = counter.clone();
131 let fut = srv.call_nowait(());
132 ntex::rt::spawn(async move {
133 let _ = fut.await;
134 counter2.set(counter2.get() + 1);
135 });
136 crate::time::sleep(Duration::from_millis(25)).await;
137 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
138
139 let counter2 = counter.clone();
140 let fut = srv.call_nowait(());
141 ntex::rt::spawn(async move {
142 let _ = fut.await;
143 counter2.set(counter2.get() + 1);
144 });
145 crate::time::sleep(Duration::from_millis(25)).await;
146 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
147
148 let counter2 = counter.clone();
149 let fut = srv.call(());
150 let (stx, srx) = oneshot::channel::<()>();
151 ntex::rt::spawn(async move {
152 let _ = fut.await;
153 counter2.set(counter2.get() + 1);
154 let _ = stx.send(());
155 });
156 crate::time::sleep(Duration::from_millis(25)).await;
157 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
158
159 let _ = tx.send(()).await;
160 crate::time::sleep(Duration::from_millis(25)).await;
161 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
162
163 let _ = tx.send(()).await;
164 crate::time::sleep(Duration::from_millis(25)).await;
165 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
166
167 let _ = tx.send(()).await;
168 let _ = srx.recv().await;
169 assert_eq!(counter.get(), 3);
170 srv.shutdown().await;
171 }
172
173 #[ntex::test]
174 async fn test_middleware() {
175 assert_eq!(InFlight::default().max_inflight, 15);
176 assert_eq!(
177 format!("{:?}", InFlight::new(1)),
178 "InFlight { max_inflight: 1 }"
179 );
180
181 let (tx, rx) = mpmc::unbounded();
182 let rx = RefCell::new(Some(rx));
183 let srv = apply(
184 InFlight::new(1),
185 fn_factory(move || {
186 let rx = rx.borrow_mut().take().unwrap();
187 async move { Ok::<_, ()>(SleepService(rx)) }
188 }),
189 );
190
191 let srv = srv.pipeline(&()).await.unwrap().bind();
192 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193
194 let srv2 = srv.clone();
195 ntex::rt::spawn(async move {
196 let _ = srv2.call(()).await;
197 });
198 crate::time::sleep(Duration::from_millis(25)).await;
199 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
200
201 let _ = tx.send(()).await;
202 crate::time::sleep(Duration::from_millis(25)).await;
203 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
204 }
205
206 #[ntex::test]
207 async fn test_middleware2() {
208 assert_eq!(InFlight::default().max_inflight, 15);
209 assert_eq!(
210 format!("{:?}", InFlight::new(1)),
211 "InFlight { max_inflight: 1 }"
212 );
213
214 let (tx, rx) = mpmc::unbounded();
215 let rx = RefCell::new(Some(rx));
216 let srv = apply2(
217 InFlight::new(1),
218 fn_factory(move || {
219 let rx = rx.borrow_mut().take().unwrap();
220 async move { Ok::<_, ()>(SleepService(rx)) }
221 }),
222 );
223
224 let srv = srv.pipeline(&()).await.unwrap().bind();
225 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
226
227 let srv2 = srv.clone();
228 ntex::rt::spawn(async move {
229 let _ = srv2.call(()).await;
230 });
231 crate::time::sleep(Duration::from_millis(25)).await;
232 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
233
234 let _ = tx.send(()).await;
235 crate::time::sleep(Duration::from_millis(25)).await;
236 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
237 }
238}