use std::time::{Duration, Instant};
use std::sync::{RwLock, Arc};
use log::*;
use futures::*;
use futures::{Future, Poll, Stream};
use tokio::timer::Delay;
use crate::session::stomp_session::StompSession;
const MIN_TIMEOUT: i64 = 15000;
pub struct InterruptibleInterval {
delay: Delay,
session: Arc<RwLock<StompSession>>,
}
impl InterruptibleInterval {
pub fn new(session: Arc<RwLock<StompSession>>) -> InterruptibleInterval {
InterruptibleInterval::new_with_delay(Delay::new(Instant::now()), session)
}
pub(crate) fn new_with_delay(delay: Delay, session: Arc<RwLock<StompSession>>) -> InterruptibleInterval {
InterruptibleInterval { delay, session }
}
}
impl Stream for InterruptibleInterval {
type Item = Instant;
type Error = tokio::timer::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!("interruptible interval polled");
if self.session.read().unwrap().shutdown_pending() {
return Ok(Async::Ready(Some(Instant::now())));
}
let _ = match self.delay.poll() {
Ok(Async::Ready(t)) => t,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_e) => return Err(tokio::timer::Error::shutdown()),
};
let now = self.delay.deadline();
let mut next_timeout;
{
next_timeout = self.session.read().unwrap().next_timeout();
if next_timeout < MIN_TIMEOUT {
next_timeout = MIN_TIMEOUT;
}
debug!("next timeout is {}", next_timeout);
}
self.delay.reset(now + Duration::from_millis(next_timeout as u64));
Ok(Some(now).into())
}
}