ntex_util/services/
inflight.rs1use ntex_service::{Middleware, Service, ServiceCtx};
3
4use super::counter::Counter;
5
6#[derive(Copy, Clone, Debug)]
11pub struct InFlight {
12 max_inflight: usize,
13}
14
15impl InFlight {
16 pub fn new(max: usize) -> Self {
17 Self { max_inflight: max }
18 }
19}
20
21impl Default for InFlight {
22 fn default() -> Self {
23 Self::new(15)
24 }
25}
26
27impl<S> Middleware<S> for InFlight {
28 type Service = InFlightService<S>;
29
30 fn create(&self, service: S) -> Self::Service {
31 InFlightService {
32 service,
33 count: Counter::new(self.max_inflight),
34 }
35 }
36}
37
38#[derive(Debug)]
39pub struct InFlightService<S> {
40 count: Counter,
41 service: S,
42}
43
44impl<S> InFlightService<S> {
45 pub fn new<R>(max: usize, service: S) -> Self
46 where
47 S: Service<R>,
48 {
49 Self {
50 service,
51 count: Counter::new(max),
52 }
53 }
54}
55
56impl<T, R> Service<R> for InFlightService<T>
57where
58 T: Service<R>,
59{
60 type Response = T::Response;
61 type Error = T::Error;
62
63 #[inline]
64 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
65 if !self.count.is_available() {
66 let (_, res) =
67 crate::future::join(self.count.available(), ctx.ready(&self.service)).await;
68 res
69 } else {
70 ctx.ready(&self.service).await
71 }
72 }
73
74 #[inline]
75 async fn call(
76 &self,
77 req: R,
78 ctx: ServiceCtx<'_, Self>,
79 ) -> Result<Self::Response, Self::Error> {
80 let _guard = self.count.get();
81 ctx.call(&self.service, req).await
82 }
83
84 ntex_service::forward_poll!(service);
85 ntex_service::forward_shutdown!(service);
86}
87
88#[cfg(test)]
89mod tests {
90 use std::{cell::RefCell, task::Poll, time::Duration};
91
92 use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
93
94 use super::*;
95 use crate::{channel::oneshot, future::lazy};
96
97 struct SleepService(oneshot::Receiver<()>);
98
99 impl Service<()> for SleepService {
100 type Response = ();
101 type Error = ();
102
103 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
104 let _ = self.0.recv().await;
105 Ok(())
106 }
107 }
108
109 #[ntex_macros::rt_test2]
110 async fn test_service() {
111 let (tx, rx) = oneshot::channel();
112
113 let srv = Pipeline::new(InFlightService::new(1, SleepService(rx))).bind();
114 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
115
116 let srv2 = srv.clone();
117 ntex::rt::spawn(async move {
118 let _ = srv2.call(()).await;
119 });
120 crate::time::sleep(Duration::from_millis(25)).await;
121 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
122
123 let _ = tx.send(());
124 crate::time::sleep(Duration::from_millis(25)).await;
125 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
126 srv.shutdown().await;
127 }
128
129 #[ntex_macros::rt_test2]
130 async fn test_middleware() {
131 assert_eq!(InFlight::default().max_inflight, 15);
132 assert_eq!(
133 format!("{:?}", InFlight::new(1)),
134 "InFlight { max_inflight: 1 }"
135 );
136
137 let (tx, rx) = oneshot::channel();
138 let rx = RefCell::new(Some(rx));
139 let srv = apply(
140 InFlight::new(1),
141 fn_factory(move || {
142 let rx = rx.borrow_mut().take().unwrap();
143 async move { Ok::<_, ()>(SleepService(rx)) }
144 }),
145 );
146
147 let srv = srv.pipeline(&()).await.unwrap().bind();
148 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
149
150 let srv2 = srv.clone();
151 ntex::rt::spawn(async move {
152 let _ = srv2.call(()).await;
153 });
154 crate::time::sleep(Duration::from_millis(25)).await;
155 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
156
157 let _ = tx.send(());
158 crate::time::sleep(Duration::from_millis(25)).await;
159 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
160 }
161}