1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
use std::convert::Infallible; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use scrappy_rt::time::{delay_until, Delay, Instant}; use scrappy_service::{Service, ServiceFactory}; use futures::future::{ok, Ready}; use super::time::{LowResTime, LowResTimeService}; pub struct KeepAlive<R, E, F> { f: F, ka: Duration, time: LowResTime, _t: PhantomData<(R, E)>, } impl<R, E, F> KeepAlive<R, E, F> where F: Fn() -> E + Clone, { pub fn new(ka: Duration, time: LowResTime, f: F) -> Self { KeepAlive { f, ka, time, _t: PhantomData, } } } impl<R, E, F> Clone for KeepAlive<R, E, F> where F: Clone, { fn clone(&self) -> Self { KeepAlive { f: self.f.clone(), ka: self.ka, time: self.time.clone(), _t: PhantomData, } } } impl<R, E, F> ServiceFactory for KeepAlive<R, E, F> where F: Fn() -> E + Clone, { type Request = R; type Response = R; type Error = E; type InitError = Infallible; type Config = (); type Service = KeepAliveService<R, E, F>; type Future = Ready<Result<Self::Service, Self::InitError>>; fn new_service(&self, _: ()) -> Self::Future { ok(KeepAliveService::new( self.ka, self.time.timer(), self.f.clone(), )) } } pub struct KeepAliveService<R, E, F> { f: F, ka: Duration, time: LowResTimeService, delay: Delay, expire: Instant, _t: PhantomData<(R, E)>, } impl<R, E, F> KeepAliveService<R, E, F> where F: Fn() -> E, { pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self { let expire = Instant::from_std(time.now() + ka); KeepAliveService { f, ka, time, expire, delay: delay_until(expire), _t: PhantomData, } } } impl<R, E, F> Service for KeepAliveService<R, E, F> where F: Fn() -> E, { type Request = R; type Response = R; type Error = E; type Future = Ready<Result<R, E>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { let now = Instant::from_std(self.time.now()); if self.expire <= now { Poll::Ready(Err((self.f)())) } else { self.delay.reset(self.expire); let _ = Pin::new(&mut self.delay).poll(cx); Poll::Ready(Ok(())) } } Poll::Pending => Poll::Ready(Ok(())), } } fn call(&mut self, req: R) -> Self::Future { self.expire = Instant::from_std(self.time.now() + self.ka); ok(req) } }