requiem_utils/
keepalive.rs

1use std::convert::Infallible;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use requiem_rt::time::{delay_until, Delay, Instant};
9use requiem_service::{Service, ServiceFactory};
10use futures::future::{ok, Ready};
11
12use super::time::{LowResTime, LowResTimeService};
13
14pub struct KeepAlive<R, E, F> {
15    f: F,
16    ka: Duration,
17    time: LowResTime,
18    _t: PhantomData<(R, E)>,
19}
20
21impl<R, E, F> KeepAlive<R, E, F>
22where
23    F: Fn() -> E + Clone,
24{
25    pub fn new(ka: Duration, time: LowResTime, f: F) -> Self {
26        KeepAlive {
27            f,
28            ka,
29            time,
30            _t: PhantomData,
31        }
32    }
33}
34
35impl<R, E, F> Clone for KeepAlive<R, E, F>
36where
37    F: Clone,
38{
39    fn clone(&self) -> Self {
40        KeepAlive {
41            f: self.f.clone(),
42            ka: self.ka,
43            time: self.time.clone(),
44            _t: PhantomData,
45        }
46    }
47}
48
49impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
50where
51    F: Fn() -> E + Clone,
52{
53    type Request = R;
54    type Response = R;
55    type Error = E;
56    type InitError = Infallible;
57    type Config = ();
58    type Service = KeepAliveService<R, E, F>;
59    type Future = Ready<Result<Self::Service, Self::InitError>>;
60
61    fn new_service(&self, _: ()) -> Self::Future {
62        ok(KeepAliveService::new(
63            self.ka,
64            self.time.timer(),
65            self.f.clone(),
66        ))
67    }
68}
69
70pub struct KeepAliveService<R, E, F> {
71    f: F,
72    ka: Duration,
73    time: LowResTimeService,
74    delay: Delay,
75    expire: Instant,
76    _t: PhantomData<(R, E)>,
77}
78
79impl<R, E, F> KeepAliveService<R, E, F>
80where
81    F: Fn() -> E,
82{
83    pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self {
84        let expire = Instant::from_std(time.now() + ka);
85        KeepAliveService {
86            f,
87            ka,
88            time,
89            expire,
90            delay: delay_until(expire),
91            _t: PhantomData,
92        }
93    }
94}
95
96impl<R, E, F> Service for KeepAliveService<R, E, F>
97where
98    F: Fn() -> E,
99{
100    type Request = R;
101    type Response = R;
102    type Error = E;
103    type Future = Ready<Result<R, E>>;
104
105    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106        match Pin::new(&mut self.delay).poll(cx) {
107            Poll::Ready(_) => {
108                let now = Instant::from_std(self.time.now());
109                if self.expire <= now {
110                    Poll::Ready(Err((self.f)()))
111                } else {
112                    self.delay.reset(self.expire);
113                    let _ = Pin::new(&mut self.delay).poll(cx);
114                    Poll::Ready(Ok(()))
115                }
116            }
117            Poll::Pending => Poll::Ready(Ok(())),
118        }
119    }
120
121    fn call(&mut self, req: R) -> Self::Future {
122        self.expire = Instant::from_std(self.time.now() + self.ka);
123        ok(req)
124    }
125}