pub use self::implementation::Timer;
#[cfg(not(feature = "use-mock-crust"))]
mod implementation {
use action::Action;
use itertools::Itertools;
use maidsafe_utilities::thread::{self, Joiner};
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::mpsc::{self, Receiver, RecvError, RecvTimeoutError, SyncSender};
use std::time::{Duration, Instant};
use types::RoutingActionSender;
struct Detail {
expiry: Instant,
token: u64,
}
#[derive(Clone)]
pub struct Timer {
inner: Rc<RefCell<Inner>>,
}
struct Inner {
next_token: u64,
tx: SyncSender<Detail>,
_worker: Joiner,
}
impl Timer {
pub fn new(sender: RoutingActionSender) -> Self {
let (tx, rx) = mpsc::sync_channel(1);
let worker = thread::named("Timer", move || Self::run(sender, rx));
Timer {
inner: Rc::new(RefCell::new(Inner {
next_token: 0,
tx,
_worker: worker,
})),
}
}
pub fn schedule(&self, duration: Duration) -> u64 {
let mut inner = self.inner.borrow_mut();
let token = inner.next_token;
inner.next_token = token.wrapping_add(1);
let detail = Detail {
expiry: Instant::now() + duration,
token,
};
inner.tx.send(detail).map(|()| token).unwrap_or_else(|e| {
error!("Timer could not be scheduled: {:?}", e);
0
})
}
fn run(sender: RoutingActionSender, rx: Receiver<Detail>) {
let mut deadlines: BTreeMap<Instant, Vec<u64>> = Default::default();
loop {
let r = if let Some(t) = deadlines.keys().next() {
let now = Instant::now();
if *t > now {
let duration = *t - now;
match rx.recv_timeout(duration) {
Ok(d) => Some(d),
Err(RecvTimeoutError::Timeout) => None,
Err(RecvTimeoutError::Disconnected) => break,
}
} else {
None
}
} else {
match rx.recv() {
Ok(d) => Some(d),
Err(RecvError) => break,
}
};
if let Some(Detail { expiry, token }) = r {
deadlines.entry(expiry).or_insert_with(Vec::new).push(token);
}
let now = Instant::now();
let expired_list = deadlines
.keys()
.take_while(|&&deadline| deadline < now)
.cloned()
.collect_vec();
for expired in expired_list {
let tokens = deadlines.remove(&expired).expect("Bug in `BTreeMap`.");
for token in tokens {
let _ = sender.send(Action::Timeout(token));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use action::Action;
use maidsafe_utilities::event_sender::MaidSafeEventCategory;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use types::RoutingActionSender;
#[test]
fn schedule() {
let (action_sender, action_receiver) = mpsc::channel();
let (category_sender, category_receiver) = mpsc::channel();
let routing_event_category = MaidSafeEventCategory::Routing;
let sender = RoutingActionSender::new(
action_sender,
routing_event_category,
category_sender.clone(),
);
let interval = Duration::from_millis(500);
let instant_when_added;
let check_no_events_received = || {
let category = category_receiver.try_recv();
assert!(
category.is_err(),
"Expected no event, but received {:?}",
category
);
let action = action_receiver.try_recv();
assert!(
action.is_err(),
"Expected no event, but received {:?}",
action
);
};
{
let timer = Timer::new(sender);
let count = 5;
for i in 0..count {
let timeout = interval * (count - i);
let token = timer.schedule(timeout);
assert_eq!(token, u64::from(i));
}
thread::sleep(Duration::from_millis(100));
for i in 0..count {
check_no_events_received();
thread::sleep(interval);
let category = category_receiver.try_recv();
match category.expect("Should have received a category.") {
MaidSafeEventCategory::Routing => (),
unexpected_category => {
panic!(
"Expected `MaidSafeEventCategory::Routing`, but received {:?}",
unexpected_category
);
}
}
let action = action_receiver.try_recv();
match action.expect("Should have received an action.") {
Action::Timeout(token) => assert_eq!(token, u64::from(count - i - 1)),
unexpected_action => {
panic!(
"Expected `Action::Timeout`, but received {:?}",
unexpected_action
);
}
}
}
instant_when_added = Instant::now();
let _ = timer.schedule(interval);
}
assert!(
Instant::now() - instant_when_added < interval,
"`Timer::drop()` is blocking."
);
thread::sleep(interval + Duration::from_millis(100));
check_no_events_received();
}
#[test]
fn heavy_duty_time_out() {
let (action_sender, _action_receiver) = mpsc::channel();
let (category_sender, _category_receiver) = mpsc::channel();
let routing_event_category = MaidSafeEventCategory::Routing;
let sender = RoutingActionSender::new(
action_sender,
routing_event_category,
category_sender.clone(),
);
let timer = Timer::new(sender);
for _ in 0..1000 {
let _ = timer.schedule(Duration::new(0, 3000));
}
}
}
}
#[cfg(feature = "use-mock-crust")]
mod implementation {
use fake_clock::FakeClock as Instant;
use itertools::Itertools;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;
use types::RoutingActionSender;
struct Inner {
next_token: u64,
deadlines: BTreeMap<Instant, Vec<u64>>,
}
#[derive(Clone)]
pub struct Timer {
inner: Rc<RefCell<Inner>>,
}
impl Timer {
pub fn new(_action_sender: RoutingActionSender) -> Self {
Timer {
inner: Rc::new(RefCell::new(Inner {
next_token: 0,
deadlines: Default::default(),
})),
}
}
pub fn schedule(&self, duration: Duration) -> u64 {
let mut inner = self.inner.borrow_mut();
let token = inner.next_token;
inner.next_token = token.wrapping_add(1);
inner
.deadlines
.entry(Instant::now() + duration)
.or_insert_with(Vec::new)
.push(token);
token
}
pub fn get_timed_out_tokens(&mut self) -> Vec<u64> {
let mut inner = self.inner.borrow_mut();
let now = Instant::now();
let expired_list = inner
.deadlines
.keys()
.take_while(|&&deadline| deadline < now)
.cloned()
.collect_vec();
let mut expired_tokens = Vec::new();
for expired in expired_list {
let tokens = unwrap!(inner.deadlines.remove(&expired));
expired_tokens.extend(tokens);
}
expired_tokens
}
}
}