ntex_util/services/
inflight.rs1use ntex_service::{Middleware, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12 max_inflight: usize,
13}
14
15impl InFlight {
16 pub fn new(max: usize) -> Self {
17 Self { max_inflight: max }
18 }
19}
20
21impl Default for InFlight {
22 fn default() -> Self {
23 Self::new(15)
24 }
25}
26
27impl<S> Middleware<S> for InFlight {
28 type Service = InFlightService<S>;
29
30 fn create(&self, service: S) -> Self::Service {
31 InFlightService {
32 service,
33 count: Counter::new(self.max_inflight),
34 }
35 }
36}
37
38#[derive(Debug)]
39pub struct InFlightService<S> {
40 count: Counter,
41 service: S,
42}
43
44impl<S> InFlightService<S> {
45 pub fn new<R>(max: usize, service: S) -> Self
46 where
47 S: Service<R>,
48 {
49 Self {
50 service,
51 count: Counter::new(max),
52 }
53 }
54}
55
56impl<T, R> Service<R> for InFlightService<T>
57where
58 T: Service<R>,
59{
60 type Response = T::Response;
61 type Error = T::Error;
62
63 #[inline]
64 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
65 if !self.count.is_available() {
66 let (_, res) =
67 crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
68 res
69 } else {
70 ctx.ready(&self.service).await
71 }
72 }
73
74 #[inline]
75 async fn call(
76 &self,
77 req: R,
78 ctx: ServiceCtx<'_, Self>,
79 ) -> Result<Self::Response, Self::Error> {
80 ctx.ready(self).await?;
81 let _guard = self.count.get();
82 ctx.call(&self.service, req).await
83 }
84
85 ntex_service::forward_poll!(service);
86 ntex_service::forward_shutdown!(service);
87}
88
89#[cfg(test)]
90mod tests {
91 use std::{cell::RefCell, task::Poll, time::Duration};
92
93 use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
94
95 use super::*;
96 use crate::{channel::oneshot, future::lazy};
97
98 struct SleepService(oneshot::Receiver<()>);
99
100 impl Service<()> for SleepService {
101 type Response = ();
102 type Error = ();
103
104 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
105 let _ = self.0.recv().await;
106 Ok(())
107 }
108 }
109
110 #[ntex_macros::rt_test2]
111 async fn test_service() {
112 let (tx, rx) = oneshot::channel();
113
114 let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
115 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
116
117 let srv2 = srv.clone();
118 ntex::rt::spawn(async move {
119 let _ = srv2.call(()).await;
120 });
121 crate::time::sleep(Duration::from_millis(25)).await;
122 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
123
124 let _ = tx.send(());
125 crate::time::sleep(Duration::from_millis(25)).await;
126 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
127 srv.shutdown().await;
128 }
129
130 #[ntex_macros::rt_test2]
131 async fn test_middleware() {
132 assert_eq!(InFlight::default().max_inflight, 15);
133 assert_eq!(
134 format!("{:?}", InFlight::new(1)),
135 "InFlight { max_inflight: 1 }"
136 );
137
138 let (tx, rx) = oneshot::channel();
139 let rx = RefCell::new(Some(rx));
140 let srv = apply(
141 InFlight::new(1),
142 fn_factory(move || {
143 let rx = rx.borrow_mut().take().unwrap();
144 async move { Ok::<_, ()>(SleepService(rx)) }
145 }),
146 );
147
148 let srv = srv.pipeline(&()).await.unwrap().bind();
149 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
150
151 let srv2 = srv.clone();
152 ntex::rt::spawn(async move {
153 let _ = srv2.call(()).await;
154 });
155 crate::time::sleep(Duration::from_millis(25)).await;
156 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
157
158 let _ = tx.send(());
159 crate::time::sleep(Duration::from_millis(25)).await;
160 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
161 }
162}