ntex_util/services/
onerequest.rs

1//! Service that limits number of in-flight async requests to 1.
2use std::{cell::Cell, future::poll_fn, task::Poll};
3
4use ntex_service::{Middleware, Service, ServiceCtx};
5
6use crate::task::LocalWaker;
7
8/// OneRequest - service factory for service that can limit number of in-flight
9/// async requests to 1.
10#[derive(Copy, Clone, Default, Debug)]
11pub struct OneRequest;
12
13impl<S> Middleware<S> for OneRequest {
14    type Service = OneRequestService<S>;
15
16    fn create(&self, service: S) -> Self::Service {
17        OneRequestService {
18            service,
19            ready: Cell::new(true),
20            waker: LocalWaker::new(),
21        }
22    }
23}
24
25#[derive(Clone, Debug)]
26pub struct OneRequestService<S> {
27    waker: LocalWaker,
28    service: S,
29    ready: Cell<bool>,
30}
31
32impl<S> OneRequestService<S> {
33    pub fn new<R>(service: S) -> Self
34    where
35        S: Service<R>,
36    {
37        Self {
38            service,
39            ready: Cell::new(true),
40            waker: LocalWaker::new(),
41        }
42    }
43}
44
45impl<T, R> Service<R> for OneRequestService<T>
46where
47    T: Service<R>,
48{
49    type Response = T::Response;
50    type Error = T::Error;
51
52    #[inline]
53    async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
54        if !self.ready.get() {
55            poll_fn(|cx| {
56                self.waker.register(cx.waker());
57                if self.ready.get() {
58                    Poll::Ready(())
59                } else {
60                    Poll::Pending
61                }
62            })
63            .await
64        }
65        ctx.ready(&self.service).await
66    }
67
68    #[inline]
69    async fn call(
70        &self,
71        req: R,
72        ctx: ServiceCtx<'_, Self>,
73    ) -> Result<Self::Response, Self::Error> {
74        self.ready.set(false);
75
76        let result = ctx.call(&self.service, req).await;
77        self.ready.set(true);
78        self.waker.wake();
79        result
80    }
81
82    ntex_service::forward_poll!(service);
83    ntex_service::forward_shutdown!(service);
84}
85
86#[cfg(test)]
87mod tests {
88    use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
89    use std::{cell::RefCell, time::Duration};
90
91    use super::*;
92    use crate::{channel::oneshot, future::lazy};
93
94    struct SleepService(oneshot::Receiver<()>);
95
96    impl Service<()> for SleepService {
97        type Response = ();
98        type Error = ();
99
100        async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
101            let _ = self.0.recv().await;
102            Ok::<_, ()>(())
103        }
104    }
105
106    #[ntex_macros::rt_test2]
107    async fn test_oneshot() {
108        let (tx, rx) = oneshot::channel();
109
110        let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
111        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
112
113        let srv2 = srv.clone();
114        ntex::rt::spawn(async move {
115            let _ = srv2.call(()).await;
116        });
117        crate::time::sleep(Duration::from_millis(25)).await;
118        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
119
120        let _ = tx.send(());
121        crate::time::sleep(Duration::from_millis(25)).await;
122        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
123        srv.shutdown().await;
124    }
125
126    #[ntex_macros::rt_test2]
127    async fn test_middleware() {
128        assert_eq!(format!("{OneRequest:?}"), "OneRequest");
129
130        let (tx, rx) = oneshot::channel();
131        let rx = RefCell::new(Some(rx));
132        let srv = apply(
133            OneRequest,
134            fn_factory(move || {
135                let rx = rx.borrow_mut().take().unwrap();
136                async move { Ok::<_, ()>(SleepService(rx)) }
137            }),
138        );
139
140        let srv = srv.pipeline(&()).await.unwrap().bind();
141        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
142
143        let srv2 = srv.clone();
144        ntex::rt::spawn(async move {
145            let _ = srv2.call(()).await;
146        });
147        crate::time::sleep(Duration::from_millis(25)).await;
148        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
149
150        let _ = tx.send(());
151        crate::time::sleep(Duration::from_millis(25)).await;
152        assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
153    }
154}