ntex_util/services/
inflight.rs1use ntex_service::{Middleware, Middleware2, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12 max_inflight: usize,
13}
14
15impl InFlight {
16 pub fn new(max: usize) -> Self {
17 Self { max_inflight: max }
18 }
19}
20
21impl Default for InFlight {
22 fn default() -> Self {
23 Self::new(15)
24 }
25}
26
27impl<S> Middleware<S> for InFlight {
28 type Service = InFlightService<S>;
29
30 fn create(&self, service: S) -> Self::Service {
31 InFlightService {
32 service,
33 count: Counter::new(self.max_inflight),
34 }
35 }
36}
37
38impl<S, C> Middleware2<S, C> for InFlight {
39 type Service = InFlightService<S>;
40
41 fn create(&self, service: S, _: C) -> Self::Service {
42 InFlightService {
43 service,
44 count: Counter::new(self.max_inflight),
45 }
46 }
47}
48
49#[derive(Debug)]
50pub struct InFlightService<S> {
51 count: Counter,
52 service: S,
53}
54
55impl<S> InFlightService<S> {
56 pub fn new<R>(max: usize, service: S) -> Self
57 where
58 S: Service<R>,
59 {
60 Self {
61 service,
62 count: Counter::new(max),
63 }
64 }
65}
66
67impl<T, R> Service<R> for InFlightService<T>
68where
69 T: Service<R>,
70{
71 type Response = T::Response;
72 type Error = T::Error;
73
74 #[inline]
75 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
76 if !self.count.is_available() {
77 let (_, res) =
78 crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
79 res
80 } else {
81 ctx.ready(&self.service).await
82 }
83 }
84
85 #[inline]
86 async fn call(
87 &self,
88 req: R,
89 ctx: ServiceCtx<'_, Self>,
90 ) -> Result<Self::Response, Self::Error> {
91 let _guard = self.count.get();
92 ctx.call(&self.service, req).await
93 }
94
95 ntex_service::forward_poll!(service);
96 ntex_service::forward_shutdown!(service);
97}
98
99#[cfg(test)]
100mod tests {
101 use std::{cell::RefCell, task::Poll, time::Duration};
102
103 use ntex_service::{Pipeline, ServiceFactory, apply, apply2, fn_factory};
104
105 use super::*;
106 use crate::{channel::oneshot, future::lazy};
107
108 struct SleepService(oneshot::Receiver<()>);
109
110 impl Service<()> for SleepService {
111 type Response = ();
112 type Error = ();
113
114 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
115 let _ = self.0.recv().await;
116 Ok(())
117 }
118 }
119
120 #[ntex::test]
121 async fn test_service() {
122 let (tx, rx) = oneshot::channel();
123
124 let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
125 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
126
127 let srv2 = srv.clone();
128 ntex::rt::spawn(async move {
129 let _ = srv2.call(()).await;
130 });
131 crate::time::sleep(Duration::from_millis(25)).await;
132 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
133
134 let _ = tx.send(());
135 crate::time::sleep(Duration::from_millis(25)).await;
136 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
137 srv.shutdown().await;
138 }
139
140 #[ntex::test]
141 async fn test_middleware() {
142 assert_eq!(InFlight::default().max_inflight, 15);
143 assert_eq!(
144 format!("{:?}", InFlight::new(1)),
145 "InFlight { max_inflight: 1 }"
146 );
147
148 let (tx, rx) = oneshot::channel();
149 let rx = RefCell::new(Some(rx));
150 let srv = apply(
151 InFlight::new(1),
152 fn_factory(move || {
153 let rx = rx.borrow_mut().take().unwrap();
154 async move { Ok::<_, ()>(SleepService(rx)) }
155 }),
156 );
157
158 let srv = srv.pipeline(&()).await.unwrap().bind();
159 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
160
161 let srv2 = srv.clone();
162 ntex::rt::spawn(async move {
163 let _ = srv2.call(()).await;
164 });
165 crate::time::sleep(Duration::from_millis(25)).await;
166 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
167
168 let _ = tx.send(());
169 crate::time::sleep(Duration::from_millis(25)).await;
170 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
171 }
172
173 #[ntex::test]
174 async fn test_middleware2() {
175 assert_eq!(InFlight::default().max_inflight, 15);
176 assert_eq!(
177 format!("{:?}", InFlight::new(1)),
178 "InFlight { max_inflight: 1 }"
179 );
180
181 let (tx, rx) = oneshot::channel();
182 let rx = RefCell::new(Some(rx));
183 let srv = apply2(
184 InFlight::new(1),
185 fn_factory(move || {
186 let rx = rx.borrow_mut().take().unwrap();
187 async move { Ok::<_, ()>(SleepService(rx)) }
188 }),
189 );
190
191 let srv = srv.pipeline(&()).await.unwrap().bind();
192 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
193
194 let srv2 = srv.clone();
195 ntex::rt::spawn(async move {
196 let _ = srv2.call(()).await;
197 });
198 crate::time::sleep(Duration::from_millis(25)).await;
199 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
200
201 let _ = tx.send(());
202 crate::time::sleep(Duration::from_millis(25)).await;
203 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
204 }
205}