pub use self::implementation::Timer;
#[cfg(not(feature = "use-mock-crust"))]
mod implementation {
use action::Action;
use itertools::Itertools;
use maidsafe_utilities::thread::{self, Joiner};
use std::collections::BTreeMap;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use types::RoutingActionSender;
struct Detail {
deadlines: BTreeMap<Instant, Vec<u64>>,
cancelled: bool,
}
pub struct Timer {
next_token: u64,
detail_and_cond_var: Arc<(Mutex<Detail>, Condvar)>,
_worker: Joiner,
}
impl Timer {
pub fn new(sender: RoutingActionSender) -> Self {
let detail = Detail {
deadlines: BTreeMap::new(),
cancelled: false,
};
let detail_and_cond_var = Arc::new((Mutex::new(detail), Condvar::new()));
let detail_and_cond_var_clone = detail_and_cond_var.clone();
let worker = thread::named("Timer", move || Self::run(sender, detail_and_cond_var));
Timer {
next_token: 0,
detail_and_cond_var: detail_and_cond_var_clone,
_worker: worker,
}
}
pub fn schedule(&mut self, duration: Duration) -> u64 {
let token = self.next_token;
self.next_token = token.wrapping_add(1);
let &(ref mutex, ref cond_var) = &*self.detail_and_cond_var;
let mut detail = mutex.lock().expect("Failed to lock.");
detail.deadlines
.entry(Instant::now() + duration)
.or_insert_with(Vec::new)
.push(token);
cond_var.notify_one();
token
}
fn run(sender: RoutingActionSender, detail_and_cond_var: Arc<(Mutex<Detail>, Condvar)>) {
let &(ref mutex, ref cond_var) = &*detail_and_cond_var;
let mut detail = mutex.lock().expect("Failed to lock.");
while !detail.cancelled {
let now = Instant::now();
let expired_list = detail.deadlines
.keys()
.take_while(|&&deadline| deadline < now)
.cloned()
.collect_vec();
for expired in expired_list {
let tokens = detail.deadlines.remove(&expired).expect("Bug in `BTreeMap`.");
for token in tokens {
let _ = sender.send(Action::Timeout(token));
}
}
if detail.deadlines.is_empty() {
detail = cond_var.wait(detail).expect("Failed to lock.");
} else {
let nearest = detail.deadlines
.keys()
.next()
.cloned()
.expect("Bug in `BTreeMap`.");
let duration = nearest - now;
detail = cond_var.wait_timeout(detail, duration).expect("Failed to lock.").0;
}
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
let &(ref mutex, ref cond_var) = &*self.detail_and_cond_var;
let mut detail = mutex.lock().expect("Failed to lock.");
detail.cancelled = true;
cond_var.notify_one();
}
}
#[cfg(test)]
mod tests {
use super::*;
use action::Action;
use maidsafe_utilities::event_sender::MaidSafeEventCategory;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
use types::RoutingActionSender;
#[test]
fn schedule() {
let (action_sender, action_receiver) = mpsc::channel();
let (category_sender, category_receiver) = mpsc::channel();
let routing_event_category = MaidSafeEventCategory::Routing;
let sender = RoutingActionSender::new(action_sender,
routing_event_category,
category_sender.clone());
let interval = Duration::from_millis(500);
let instant_when_added;
let check_no_events_received = || {
let category = category_receiver.try_recv();
assert!(category.is_err(),
"Expected no event, but received {:?}",
category);
let action = action_receiver.try_recv();
assert!(action.is_err(),
"Expected no event, but received {:?}",
action);
};
{
let mut timer = Timer::new(sender);
let count = 5;
for i in 0..count {
let timeout = interval * (count - i);
let token = timer.schedule(timeout);
assert_eq!(token, i as u64);
}
thread::sleep(Duration::from_millis(100));
for i in 0..count {
check_no_events_received();
thread::sleep(interval);
let category = category_receiver.try_recv();
match category.expect("Should have received a category.") {
MaidSafeEventCategory::Routing => (),
unexpected_category => {
panic!("Expected `MaidSafeEventCategory::Routing`, but received {:?}",
unexpected_category);
}
}
let action = action_receiver.try_recv();
match action.expect("Should have received an action.") {
Action::Timeout(token) => assert_eq!(token, (count - i - 1) as u64),
unexpected_action => {
panic!("Expected `Action::Timeout`, but received {:?}",
unexpected_action);
}
}
}
instant_when_added = Instant::now();
let _ = timer.schedule(interval);
}
assert!(Instant::now() - instant_when_added < interval,
"`Timer::drop()` is blocking.");
thread::sleep(interval + Duration::from_millis(100));
check_no_events_received();
}
}
}
#[cfg(feature = "use-mock-crust")]
mod implementation {
use std::time::Duration;
use types::RoutingActionSender;
pub struct Timer {
next_token: u64,
}
impl Timer {
pub fn new(_: RoutingActionSender) -> Self {
Timer { next_token: 0 }
}
pub fn schedule(&mut self, _: Duration) -> u64 {
let token = self.next_token;
self.next_token = token.wrapping_add(1);
token
}
}
}