ntex_util/services/
onerequest.rs1use std::{cell::Cell, future::poll_fn, task::Poll};
3
4use ntex_service::{Middleware, Service, ServiceCtx};
5
6use crate::task::LocalWaker;
7
8#[derive(Copy, Clone, Default, Debug)]
11pub struct OneRequest;
12
13impl<S> Middleware<S> for OneRequest {
14 type Service = OneRequestService<S>;
15
16 fn create(&self, service: S) -> Self::Service {
17 OneRequestService {
18 service,
19 ready: Cell::new(true),
20 waker: LocalWaker::new(),
21 }
22 }
23}
24
25#[derive(Clone, Debug)]
26pub struct OneRequestService<S> {
27 waker: LocalWaker,
28 service: S,
29 ready: Cell<bool>,
30}
31
32impl<S> OneRequestService<S> {
33 pub fn new<R>(service: S) -> Self
34 where
35 S: Service<R>,
36 {
37 Self {
38 service,
39 ready: Cell::new(true),
40 waker: LocalWaker::new(),
41 }
42 }
43}
44
45impl<T, R> Service<R> for OneRequestService<T>
46where
47 T: Service<R>,
48{
49 type Response = T::Response;
50 type Error = T::Error;
51
52 #[inline]
53 async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
54 if !self.ready.get() {
55 poll_fn(|cx| {
56 self.waker.register(cx.waker());
57 if self.ready.get() {
58 Poll::Ready(())
59 } else {
60 Poll::Pending
61 }
62 })
63 .await
64 }
65 ctx.ready(&self.service).await
66 }
67
68 #[inline]
69 async fn call(
70 &self,
71 req: R,
72 ctx: ServiceCtx<'_, Self>,
73 ) -> Result<Self::Response, Self::Error> {
74 self.ready.set(false);
75
76 let result = ctx.call(&self.service, req).await;
77 self.ready.set(true);
78 self.waker.wake();
79 result
80 }
81
82 ntex_service::forward_poll!(service);
83 ntex_service::forward_shutdown!(service);
84}
85
86#[cfg(test)]
87mod tests {
88 use ntex_service::{apply, fn_factory, Pipeline, ServiceFactory};
89 use std::{cell::RefCell, time::Duration};
90
91 use super::*;
92 use crate::{channel::oneshot, future::lazy};
93
94 struct SleepService(oneshot::Receiver<()>);
95
96 impl Service<()> for SleepService {
97 type Response = ();
98 type Error = ();
99
100 async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
101 let _ = self.0.recv().await;
102 Ok::<_, ()>(())
103 }
104 }
105
106 #[ntex_macros::rt_test2]
107 async fn test_oneshot() {
108 let (tx, rx) = oneshot::channel();
109
110 let srv = Pipeline::new(OneRequestService::new(SleepService(rx))).bind();
111 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
112
113 let srv2 = srv.clone();
114 ntex::rt::spawn(async move {
115 let _ = srv2.call(()).await;
116 });
117 crate::time::sleep(Duration::from_millis(25)).await;
118 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
119
120 let _ = tx.send(());
121 crate::time::sleep(Duration::from_millis(25)).await;
122 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
123 srv.shutdown().await;
124 }
125
126 #[ntex_macros::rt_test2]
127 async fn test_middleware() {
128 assert_eq!(format!("{OneRequest:?}"), "OneRequest");
129
130 let (tx, rx) = oneshot::channel();
131 let rx = RefCell::new(Some(rx));
132 let srv = apply(
133 OneRequest,
134 fn_factory(move || {
135 let rx = rx.borrow_mut().take().unwrap();
136 async move { Ok::<_, ()>(SleepService(rx)) }
137 }),
138 );
139
140 let srv = srv.pipeline(&()).await.unwrap().bind();
141 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
142
143 let srv2 = srv.clone();
144 ntex::rt::spawn(async move {
145 let _ = srv2.call(()).await;
146 });
147 crate::time::sleep(Duration::from_millis(25)).await;
148 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
149
150 let _ = tx.send(());
151 crate::time::sleep(Duration::from_millis(25)).await;
152 assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
153 }
154}