romp 0.5.2

STOMP server and WebSockets platform
Documentation
use std::time::{Duration, Instant};
use std::sync::{RwLock, Arc};

use log::*;

use futures::*;
use futures::{Future, Poll, Stream};

use tokio::timer::Delay;

use crate::session::stomp_session::StompSession;

const MIN_TIMEOUT: i64 = 15000;

/// A stream representing notifications at fixed interval.  This is clone of Tokio code with
/// an added thread safe handle that can ensure a clean shutdown.

//#[derive(Debug)]
pub struct InterruptibleInterval {
    /// Future that completes the next time the `Interval` yields a value.
    delay: Delay,

    /// session to check for interrupting
    session: Arc<RwLock<StompSession>>,
}

impl InterruptibleInterval {
    /// Create a new `Interval` yields every `session.next_timeout()`

    pub fn new(session: Arc<RwLock<StompSession>>) -> InterruptibleInterval {

        InterruptibleInterval::new_with_delay(Delay::new(Instant::now()), session)
    }

    pub(crate) fn new_with_delay(delay: Delay, session: Arc<RwLock<StompSession>>) -> InterruptibleInterval {
        InterruptibleInterval { delay, session }
    }
}

impl Stream for InterruptibleInterval {
    type Item = Instant;
    type Error = tokio::timer::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        debug!("interruptible interval polled");

        // do the interrupt thing
        if self.session.read().unwrap().shutdown_pending() {
            return Ok(Async::Ready(Some(Instant::now())));
        }

        // Wait for the delay to be done
        let _ = match self.delay.poll() {
            Ok(Async::Ready(t)) => t,
            Ok(Async::NotReady) => return Ok(Async::NotReady),
            Err(_e) => return Err(tokio::timer::Error::shutdown()),
        };

        // Get the `now` by looking at the `delay` deadline
        let now = self.delay.deadline();

        let mut next_timeout;
        {
            next_timeout = self.session.read().unwrap().next_timeout();
            if next_timeout < MIN_TIMEOUT {
                next_timeout = MIN_TIMEOUT;
            }
            debug!("next timeout is {}", next_timeout);
        }
        self.delay.reset(now + Duration::from_millis(next_timeout as u64));

        // Return the current instant
        Ok(Some(now).into())
    }
}