1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
use std::convert::Infallible; use std::marker::PhantomData; use std::time::{Duration, Instant}; use actix_service::{NewService, Service}; use futures::future::{ok, FutureResult}; use futures::{Async, Future, Poll}; use tokio_timer::Delay; use super::time::{LowResTime, LowResTimeService}; pub struct KeepAlive<R, E, F> { f: F, ka: Duration, time: LowResTime, _t: PhantomData<(R, E)>, } impl<R, E, F> KeepAlive<R, E, F> where F: Fn() -> E + Clone, { pub fn new(ka: Duration, time: LowResTime, f: F) -> Self { KeepAlive { f, ka, time, _t: PhantomData, } } } impl<R, E, F> Clone for KeepAlive<R, E, F> where F: Clone, { fn clone(&self) -> Self { KeepAlive { f: self.f.clone(), ka: self.ka, time: self.time.clone(), _t: PhantomData, } } } impl<R, E, F> NewService for KeepAlive<R, E, F> where F: Fn() -> E + Clone, { type Request = R; type Response = R; type Error = E; type InitError = Infallible; type Config = (); type Service = KeepAliveService<R, E, F>; type Future = FutureResult<Self::Service, Self::InitError>; fn new_service(&self, _: &()) -> Self::Future { ok(KeepAliveService::new( self.ka, self.time.timer(), self.f.clone(), )) } } pub struct KeepAliveService<R, E, F> { f: F, ka: Duration, time: LowResTimeService, delay: Delay, expire: Instant, _t: PhantomData<(R, E)>, } impl<R, E, F> KeepAliveService<R, E, F> where F: Fn() -> E, { pub fn new(ka: Duration, time: LowResTimeService, f: F) -> Self { let expire = time.now() + ka; KeepAliveService { f, ka, time, expire, delay: Delay::new(expire), _t: PhantomData, } } } impl<R, E, F> Service for KeepAliveService<R, E, F> where F: Fn() -> E, { type Request = R; type Response = R; type Error = E; type Future = FutureResult<R, E>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { match self.delay.poll() { Ok(Async::Ready(_)) => { let now = self.time.now(); if self.expire <= now { Err((self.f)()) } else { self.delay.reset(self.expire); let _ = self.delay.poll(); Ok(Async::Ready(())) } } Ok(Async::NotReady) => Ok(Async::Ready(())), Err(_e) => panic!(), } } fn call(&mut self, req: R) -> Self::Future { self.expire = self.time.now() + self.ka; ok(req) } }