actori_utils/
keepalive.rs1use std::convert::Infallible;
2use std::future::Future;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7
8use actori_rt::time::{delay_until, Delay, Instant};
9use actori_service::{Service, ServiceFactory};
10use futures::future::{ok, Ready};
11
12use super::time::{LowResTime, LowResTimeService};
13
14pub struct KeepAlive<R, E, F> {
15 f: F,
16 ka: Duration,
17 time: LowResTime,
18 _t: PhantomData<(R, E)>,
19}
20
21impl<R, E, F> KeepAlive<R, E, F>
22where
23 F: Fn() -> E + Clone,
24{
25 pub fn new(ka: Duration, time: LowResTime, f: F) -> Self {
26 KeepAlive {
27 f,
28 ka,
29 time,
30 _t: PhantomData,
31 }
32 }
33}
34
35impl<R, E, F> Clone for KeepAlive<R, E, F>
36where
37 F: Clone,
38{
39 fn clone(&self) -> Self {
40 KeepAlive {
41 f: self.f.clone(),
42 ka: self.ka,
43 time: self.time.clone(),
44 _t: PhantomData,
45 }
46 }
47}
48
49impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
50where
51 F: Fn() -> E + Clone,
52{
53 type Request = R;
54 type Response = R;
55 type Error = E;
56 type InitError = Infallible;
57 type Config = ();
58 type Service = KeepAliveService<R, E, F>;
59 type Future = Ready<Result<Self::Service, Self::InitError>>;
60
61 fn new_service(&self, _: ()) -> Self::Future {
62 ok(KeepAliveService::new(
63 self.ka,
64 self.time.timer(),
65 self.f.clone(),
66 ))
67 }
68}
69
70pub struct KeepAliveService<R, E, F> {
71 f: F,
72 ka: Duration,
73 time: LowResTimeService,
74 delay: Delay,
75 expire: Instant,
76 _t: PhantomData<(R, E)>,
77}
78
79impl<R, E, F> KeepAliveService<R, E, F>
80where
81 F: Fn() -> E,
82{
83 pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self {
84 let expire = Instant::from_std(time.now() + ka);
85 KeepAliveService {
86 f,
87 ka,
88 time,
89 expire,
90 delay: delay_until(expire),
91 _t: PhantomData,
92 }
93 }
94}
95
96impl<R, E, F> Service for KeepAliveService<R, E, F>
97where
98 F: Fn() -> E,
99{
100 type Request = R;
101 type Response = R;
102 type Error = E;
103 type Future = Ready<Result<R, E>>;
104
105 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 match Pin::new(&mut self.delay).poll(cx) {
107 Poll::Ready(_) => {
108 let now = Instant::from_std(self.time.now());
109 if self.expire <= now {
110 Poll::Ready(Err((self.f)()))
111 } else {
112 self.delay.reset(self.expire);
113 let _ = Pin::new(&mut self.delay).poll(cx);
114 Poll::Ready(Ok(()))
115 }
116 }
117 Poll::Pending => Poll::Ready(Ok(())),
118 }
119 }
120
121 fn call(&mut self, req: R) -> Self::Future {
122 self.expire = Instant::from_std(self.time.now() + self.ka);
123 ok(req)
124 }
125}