use std::iter::Iterator;
use std::hash::Hash;
use std::collections::HashSet;
use std::mem;
use std::fmt::Debug;
use std::time::Duration;
use super::{InnerWheel, Wheel, Resolution, wheel_sizes};
pub struct CopyWheel<T: Eq + Hash + Debug + Clone> {
resolutions: Vec<Resolution>,
keys: HashSet<T>,
wheels: Vec<InnerWheel<T>>,
slot_indexes: Vec<usize>,
}
impl<T: Eq + Hash + Debug + Clone> CopyWheel<T> {
pub fn new(mut resolutions: Vec<Resolution>) -> CopyWheel<T> {
let sizes = wheel_sizes(&mut resolutions);
let indexes = vec![0; sizes.len()];
CopyWheel {
resolutions: resolutions,
keys: HashSet::new(),
wheels: sizes.iter().map(|size| InnerWheel::new(*size)).collect(),
slot_indexes: indexes
}
}
fn insert_hours(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
let slot = time.as_secs()/3600;
self.insert(key, time, Resolution::Hour, slot as usize + 1)
}
fn insert_minutes(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
let slot = time.as_secs()/60;
self.insert(key, time, Resolution::Min, slot as usize + 1)
}
fn insert_seconds(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
self.insert(key, time, Resolution::Sec, time.as_secs() as usize + 1)
}
fn insert_hundred_ms(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
let slot = time.subsec_nanos()/(1000*1000*100);
self.insert(key, time, Resolution::HundredMs, slot as usize + 1)
}
fn insert_ten_ms(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
let slot = time.subsec_nanos()/(1000*1000*10);
self.insert(key, time, Resolution::TenMs, slot as usize + 1)
}
fn insert_ms(&mut self, key: T, time: Duration) -> Result<(), (T, Duration)> {
let slot = time.subsec_nanos()/(1000*1000);
self.insert(key, time, Resolution::Ms, slot as usize + 1)
}
fn insert(&mut self,
key: T,
time: Duration,
resolution: Resolution,
mut slot: usize) -> Result<(), (T, Duration)>
{
if slot == 1 { return Err((key, time)); }
if let Some(wheel_index) = self.resolutions.iter().rposition(|ref r| **r == resolution) {
let max_slot = self.wheels[wheel_index].slots.len();
if slot > max_slot {
slot = max_slot
}
let slot_index = (self.slot_indexes[wheel_index] + slot) % max_slot;
self.wheels[wheel_index].slots[slot_index].entries.push(key);
return Ok(());
}
Err((key, time))
}
}
impl<T: Eq + Hash + Debug + Clone> Wheel<T> for CopyWheel<T> {
fn start(&mut self, key: T, time: Duration) {
self.keys.insert(key.clone());
let _ = self.insert_hours(key, time)
.or_else(|(key, time)| self.insert_minutes(key, time))
.or_else(|(key, time)| self.insert_seconds(key, time))
.or_else(|(key, time)| self.insert_hundred_ms(key, time))
.or_else(|(key, time)| self.insert_ten_ms(key, time))
.or_else(|(key, time)| self.insert_ms(key, time));
}
fn stop(&mut self, key: T) {
self.keys.remove(&key);
}
fn expire(&mut self) -> Vec<T> {
let mut keys = HashSet::new();
mem::swap(&mut keys, &mut self.keys);
let mut expired = Vec::new();
for (ref mut wheel, ref mut slot_index) in self.wheels.iter_mut().zip(&mut self.slot_indexes) {
**slot_index = (**slot_index + 1) % wheel.slots.len();
expired.extend(wheel.slots[**slot_index].entries.drain(..)
.filter(|key| keys.remove(key)));
if **slot_index != 0 {
break;
}
}
mem::swap(&mut keys, &mut self.keys);
expired
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use super::super::{Resolution, Wheel};
fn values() -> (Vec<Resolution>, Vec<Duration>, Vec<&'static str>) {
let resolutions = vec![
Resolution::Ms,
Resolution::TenMs,
Resolution::HundredMs,
Resolution::Sec,
Resolution::Min,
Resolution::Hour
];
let times = vec![
Duration::from_millis(5),
Duration::from_millis(35),
Duration::from_millis(150),
Duration::from_secs(5) + Duration::from_millis(10),
Duration::from_secs(5*60) + Duration::from_secs(10),
Duration::from_secs(5*3600) + Duration::from_secs(10)
];
let keys = vec!["a", "b", "c", "d", "e", "f"];
(resolutions, times, keys)
}
#[test]
fn start_and_expire() {
let (resolutions, times, keys) = values();
let mut wheel = CopyWheel::new(resolutions);
for (key, time) in keys.into_iter().zip(times) {
wheel.start(key, time);
}
verify_expire(&mut wheel);
}
#[test]
fn start_and_stop_then_expire() {
let (resolutions, times, keys) = values();
let mut wheel = CopyWheel::new(resolutions);
for (key, time) in keys.clone().into_iter().zip(times) {
wheel.start(key, time);
}
verify_wheel_and_slot_position(&mut wheel);
for key in keys {
wheel.stop(key);
}
verify_expire_contains_only_weak_refs(&mut wheel);
}
fn verify_wheel_and_slot_position(wheel: &mut CopyWheel<&'static str>) {
let (_, _, keys) = values();
let expected_slots = [6, 4, 2, 6, 6, 6];
for i in 0..wheel.wheels.len() {
for j in 0..wheel.wheels[i].slots.len() {
let ref entries = wheel.wheels[i].slots[j].entries;
if j == expected_slots[i] {
assert_eq!(1, entries.len());
assert_eq!(keys[i], entries[0]);
} else {
assert_eq!(0, entries.len());
}
}
}
}
fn verify_expire_contains_only_weak_refs(wheel: &mut CopyWheel<&'static str>) {
let total_ticks = 6*60000 - 1;
for _ in 0..total_ticks {
let expired = wheel.expire();
assert_eq!(0, expired.len());
}
}
fn verify_expire(wheel: &mut CopyWheel<&'static str>) {
let (_, _, keys) = values();
let expected_ticks = [
5, 4 * 10 - 1, 2 * 100 - 1, 6 * 1000 - 1, 6 * 60000 - 1,
];
let mut match_count = 0;
for i in 0..expected_ticks[4] {
let expired = wheel.expire();
if expected_ticks.contains(&i) {
assert_eq!(1, expired.len());
assert_eq!(keys[match_count], expired[0]);
match_count = match_count + 1;
} else {
assert_eq!(0, expired.len());
}
}
}
}