#[macro_use]
extern crate futures;
extern crate tokio_timer;
#[cfg(test)]
extern crate tokio_executor;
use futures::{
future::Fuse,
prelude::*,
sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
};
use std::time::Duration;
use tokio_timer::{clock::now as clock_now, Delay};
#[derive(Clone)]
pub struct Warden {
pub(crate) notify_tx: UnboundedSender<bool>,
}
pub struct Evacuate<F: Future> {
count: u64,
notify_rx: UnboundedReceiver<bool>,
tripwire: Fuse<F>,
timeout_ms: u64,
timeout: Delay,
}
impl Warden {
pub fn increment(&self) { let _ = self.notify_tx.unbounded_send(true); }
pub fn decrement(&self) { let _ = self.notify_tx.unbounded_send(false); }
}
impl<F: Future> Evacuate<F> {
pub fn new(tripwire: F, timeout_ms: u64) -> (Warden, Evacuate<F>) {
let (notify_tx, notify_rx) = unbounded();
let warden = Warden { notify_tx };
let evacuate = Evacuate {
count: 0,
notify_rx,
tripwire: tripwire.fuse(),
timeout_ms,
timeout: Delay::new(clock_now()),
};
(warden, evacuate)
}
}
impl<F: Future> Future for Evacuate<F> {
type Error = ();
type Item = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
while let Ok(Async::Ready(Some(state))) = self.notify_rx.poll() {
if state {
self.count += 1;
} else {
self.count -= 1;
}
}
if !self.tripwire.is_done() {
let _ = try_ready!(self.tripwire.poll().map_err(|_| ()));
self.timeout.reset(clock_now() + Duration::from_millis(self.timeout_ms));
}
if self.count == 0 {
return Ok(Async::Ready(()));
}
self.timeout.poll().map_err(|_| ())
}
}
#[cfg(test)]
mod tests {
extern crate tokio_executor;
#[macro_use]
mod support;
use self::support::*;
use super::Evacuate;
use futures::{
future::{empty, ok},
Future,
};
#[test]
fn test_evacuate_stops_at_tripwire() {
mocked(|_, _| {
let tripwire = empty::<(), ()>();
let (_warden, mut evacuate) = Evacuate::new(tripwire, 10000);
assert_not_ready!(evacuate);
});
}
#[test]
fn test_evacuate_falls_through_on_tripwire() {
mocked(|_, _| {
let tripwire = ok::<(), ()>(());
let (_warden, mut evacuate) = Evacuate::new(tripwire, 10000);
assert_ready!(evacuate);
});
}
#[test]
fn test_evacuate_stops_after_tripping_with_clients() {
mocked(|_, _| {
let tripwire = ok::<(), ()>(());
let (warden, mut evacuate) = Evacuate::new(tripwire, 10000);
warden.increment();
assert_not_ready!(evacuate);
});
}
#[test]
fn test_evacuate_completes_after_client_count_ping_pong() {
mocked(|_, _| {
let tripwire = ok::<(), ()>(());
let (warden, mut evacuate) = Evacuate::new(tripwire, 10000);
warden.increment();
assert_not_ready!(evacuate);
warden.increment();
assert_not_ready!(evacuate);
warden.decrement();
warden.decrement();
assert_ready!(evacuate);
});
}
#[test]
fn test_evacuate_delay_before_clients_hit_zero() {
mocked(|timer, _| {
let tripwire = ok::<(), ()>(());
let (warden, mut evacuate) = Evacuate::new(tripwire, 10000);
warden.increment();
assert_not_ready!(evacuate);
warden.increment();
assert_not_ready!(evacuate);
warden.decrement();
assert_not_ready!(evacuate);
advance(timer, ms(10001));
assert_ready!(evacuate);
});
}
}