use pendulum::Pendulum;
use futures::Stream;
use std::sync::atomic::{Ordering, AtomicUsize};
use pendulum::Token;
use error::{PendulumResult, PendulumErrorKind, PendulumError};
use std::collections::HashMap;
use std::thread::Thread;
use std::sync::Arc;
use std::thread;
use futures::Poll;
use futures::Future;
use futures::Async;
use futures::task::{self, Task};
use std::time::Instant;
use std::time::Duration;
use crossbeam::sync::SegQueue;
const DEFAULT_CHANNEL_CAPACITY: usize = 128;
pub struct TimerBuilder {
channel_capacity: usize
}
impl TimerBuilder {
pub fn with_channel_capacity(mut self, capacity: usize) -> TimerBuilder {
self.channel_capacity = capacity;
self
}
pub fn channel_capacity(&self) -> usize {
self.channel_capacity
}
pub fn build<P>(self, pendulum: P) -> Timer
where P: Pendulum<TimerItem> + Send + 'static {
Timer::new(self, pendulum)
}
}
impl Default for TimerBuilder {
fn default() -> TimerBuilder {
TimerBuilder{ channel_capacity: DEFAULT_CHANNEL_CAPACITY }
}
}
pub struct TimedOut;
#[derive(Debug, PartialEq, Eq)]
pub enum TimeoutStatus<T> {
Original(T),
TimedOut
}
impl<T> From<TimedOut> for TimeoutStatus<T> {
fn from(_: TimedOut) -> TimeoutStatus<T> {
TimeoutStatus::TimedOut
}
}
pub struct Timeout<F> {
opt_sleep: Option<Sleep>,
future: F
}
impl<F> Future for Timeout<F> where F: Future, F::Error: From<TimedOut> {
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let opt_poll_result = self.opt_sleep.as_mut().map(Future::poll);
match opt_poll_result {
Some(Ok(Async::Ready(()))) => {
self.opt_sleep.take();
Err(TimedOut.into())
},
Some(_) => {
self.future.poll()
},
None => {
Err(TimedOut.into())
}
}
}
}
pub struct TimeoutStream<S> {
sleep: Sleep,
stream: S
}
impl<S> Stream for TimeoutStream<S> where S: Stream, S::Error: From<TimedOut> {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let sleep_result = self.sleep.poll();
match sleep_result {
Ok(Async::Ready(())) => {
Err(TimedOut.into())
},
_ => self.stream.poll()
}
}
}
pub struct Heartbeat<S> {
sleep: Sleep,
stream: S
}
impl<S> Stream for Heartbeat<S> where S: Stream, S::Item: From<TimedOut> {
type Item = S::Item;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let sleep_result = self.sleep.poll();
match sleep_result {
Ok(Async::Ready(())) => {
self.sleep.restart();
Ok(Async::Ready(Some(TimedOut.into())))
},
_ => self.stream.poll()
}
}
}
pub struct SleepStream {
sleep: Sleep
}
impl SleepStream {
fn new(sleep: Sleep) -> SleepStream {
SleepStream{ sleep: sleep }
}
}
impl Stream for SleepStream {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Option<()>, ()> {
let poll_result = self.sleep.poll();
if let Ok(Async::Ready(())) = poll_result {
self.sleep.restart();
}
poll_result.map(|async| async.map(Option::Some))
}
}
pub struct Sleep {
mapping: usize,
duration: Duration,
started: Instant,
sent_task: Option<Task>,
futures: Timer
}
impl Sleep {
fn new(mapping: usize, duration: Duration, futures: Timer) -> Sleep {
Sleep{ mapping: mapping, duration: duration, started: Instant::now(), sent_task: None, futures: futures }
}
fn restart(&mut self) {
self.sent_task = None;
self.started = Instant::now();
}
}
impl Future for Sleep {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if Instant::now().duration_since(self.started) >= self.duration {
return Ok(Async::Ready(()))
}
let should_send_create = self.sent_task.as_ref()
.map(|task| !task.will_notify_current())
.unwrap_or(true);
if should_send_create {
let sent = self.futures.inner.try_push_create_timer(CreateTimeout{
mapping: self.mapping, duration: self.duration, started: self.started, task: task::current() });
if !sent {
warn!("Couldnt Send a Create Timeout Request From Sleep; Backing Thread May Be Running Slow");
task::current().notify();
} else {
self.sent_task = Some(task::current());
self.futures.thread.thread.unpark();
}
}
Ok(Async::NotReady)
}
}
impl Drop for Sleep {
fn drop(&mut self) {
if self.sent_task.is_some() {
let sent = self.futures.inner.try_push_delete_timer(DeleteTimeout{ mapping: self.mapping });
if !sent {
warn!("Couldnt Send A Delete Timeout Request From Sleep; Backing Thread May Be Running Slow");
self.futures.inner.return_mapping(self.mapping);
} else {
self.futures.thread.thread.unpark();
}
} else {
self.futures.inner.return_mapping(self.mapping);
}
}
}
struct UnparkOnDropThread {
thread: Thread
}
impl Drop for UnparkOnDropThread {
fn drop(&mut self) {
self.thread.unpark();
}
}
#[derive(Clone)]
pub struct Timer {
inner: Arc<InnerTimer>,
thread: Arc<UnparkOnDropThread>,
max_timeout: Duration
}
impl Timer {
pub fn new<P>(builder: TimerBuilder, pendulum: P) -> Timer
where P: Pendulum<TimerItem> + Send + 'static {
let inner = Arc::new(InnerTimer::new(pendulum.max_capacity(), builder.channel_capacity()));
let max_timeout = pendulum.max_timeout();
let thread_inner = inner.clone();
let thread_handle = thread::spawn(move || run_pendulum_timer(thread_inner, pendulum)).thread().clone();
Timer{ inner: inner, thread: Arc::new(UnparkOnDropThread{ thread: thread_handle }), max_timeout: max_timeout }
}
pub fn sleep(&self, duration: Duration) -> PendulumResult<Sleep, ()> {
self.validate_request(duration).map(|mapping| {
Sleep::new(mapping, duration, self.clone())
})
}
pub fn sleep_stream(&self, duration: Duration) -> PendulumResult<SleepStream, ()> {
self.sleep(duration).map(SleepStream::new)
}
pub fn timeout<F>(&self, duration: Duration, future: F) -> PendulumResult<Timeout<F>, ()>
where F: Future, F::Error: From<TimedOut> {
self.sleep(duration).map(|sleep| Timeout{ opt_sleep: Some(sleep), future: future })
}
pub fn timeout_stream<S>(&self, duration: Duration, stream: S) -> PendulumResult<TimeoutStream<S>, ()>
where S: Stream, S::Error: From<TimedOut> {
self.sleep(duration).map(|sleep| TimeoutStream{ sleep: sleep, stream: stream })
}
pub fn heartbeat<S>(&self, duration: Duration, stream: S) -> PendulumResult<Heartbeat<S>, ()>
where S: Stream, S::Item: From<TimedOut> {
self.sleep(duration).map(|sleep| Heartbeat{ sleep: sleep, stream: stream })
}
fn validate_request(&self, duration: Duration) -> PendulumResult<usize, ()> {
if duration > self.max_timeout {
Err(PendulumError::new((), PendulumErrorKind::MaxCapacityReached))
} else {
self.inner.try_retrieve_mapping()
.ok_or_else(|| PendulumError::new((), PendulumErrorKind::MaxTimeoutExceeded))
}
}
}
#[derive(Debug)]
pub struct TimerItem {
task: Task,
started: Instant,
duration: Duration,
mapping: usize
}
fn run_pendulum_timer<P>(inner: Arc<InnerTimer>, mut pendulum: P)
where P: Pendulum<TimerItem> {
let mut current_time;
let mut last_tick_time = Instant::now();
let mut leftover_tick = Duration::new(0, 0);
let mut mapping_table: HashMap<usize, Token> = HashMap::with_capacity(pendulum.max_capacity());
loop {
current_time = Instant::now();
let mut duration_since_last_tick = current_time.duration_since(last_tick_time) + leftover_tick;
while duration_since_last_tick >= pendulum.tick_duration() {
duration_since_last_tick -= pendulum.tick_duration();
pendulum.tick();
last_tick_time = current_time;
}
leftover_tick = duration_since_last_tick;
while let Some(request) = inner.try_pop_request() {
match request {
TimeoutRequest::Create(create_request) => {
current_time = Instant::now();
let time_to_schedule = current_time.duration_since(create_request.started);
let real_timeout = create_request.duration.checked_sub(time_to_schedule).unwrap_or(Duration::new(0, 0));
if real_timeout == Duration::new(0, 0) {
create_request.task.notify()
} else {
duration_since_last_tick = current_time.duration_since(last_tick_time) + leftover_tick;
let accurate_real_timeout = real_timeout + duration_since_last_tick;
let item = TimerItem{ task: create_request.task, started: create_request.started,
duration: create_request.duration, mapping: create_request.mapping };
let token = pendulum.insert_timeout(accurate_real_timeout, item)
.expect("pendulum: Failed To Push Timeout Onto Pendulum");
mapping_table.insert(create_request.mapping, token);
}
},
TimeoutRequest::Delete(delete_request) => {
let mapping = delete_request.mapping;
if let Some(token) = mapping_table.remove(&mapping) {
pendulum.remove_timeout(token);
}
inner.return_mapping(delete_request.mapping);
}
}
}
current_time = Instant::now();
while let Some(TimerItem{ task, started, duration, mapping }) = pendulum.expired_timeout() {
let total_time = current_time.duration_since(started);
if total_time < duration {
let requeue_duration = duration - total_time;
warn!("Task Was Ready Before Duration Of {:?} Was Up, Leftover Duration Was {:?}; Re-Queueing", duration, requeue_duration);
let item = TimerItem{ task: task, started: started, duration: duration, mapping: mapping };
let token = pendulum.insert_timeout(requeue_duration, item)
.expect("pendulum: Failed To Re-Push Timeout Onto Pendulum");
mapping_table.insert(mapping, token);
} else {
task.notify();
}
}
if Arc::strong_count(&inner) == 1 {
break;
} else {
let time_to_next_tick = pendulum.tick_duration() - leftover_tick;
thread::park_timeout(time_to_next_tick);
}
}
}
enum TimeoutRequest {
Create(CreateTimeout),
Delete(DeleteTimeout)
}
struct CreateTimeout {
mapping: usize,
duration: Duration,
started: Instant,
task: Task
}
struct DeleteTimeout {
mapping: usize
}
struct InnerTimer {
mapping_queue: SegQueue<usize>,
request_queue: (SegQueue<TimeoutRequest>, AtomicUsize),
channel_capacity: usize
}
impl InnerTimer {
pub fn new(timer_capacity: usize, channel_capacity: usize) -> InnerTimer {
let mapping_queue = SegQueue::new();
let mut next_mapping = 0;
for _ in 0..timer_capacity {
mapping_queue.push(next_mapping);
next_mapping += 1;
}
InnerTimer{ mapping_queue: mapping_queue, request_queue: (SegQueue::new(), AtomicUsize::new(0)),
channel_capacity: channel_capacity }
}
pub fn channel_capacity(&self) -> usize {
self.channel_capacity
}
pub fn try_retrieve_mapping(&self) -> Option<usize> {
self.mapping_queue.try_pop()
}
pub fn return_mapping(&self, mapping: usize) {
self.mapping_queue.push(mapping);
}
pub fn try_push_create_timer(&self, timer: CreateTimeout) -> bool {
try_push(&self.request_queue.0, &self.request_queue.1, self.channel_capacity(), TimeoutRequest::Create(timer))
}
pub fn try_push_delete_timer(&self, timer: DeleteTimeout) -> bool {
try_push(&self.request_queue.0, &self.request_queue.1, self.channel_capacity(), TimeoutRequest::Delete(timer))
}
pub fn try_pop_request(&self) -> Option<TimeoutRequest> {
self.request_queue.0.try_pop().map(|request| {
self.request_queue.1.fetch_sub(1, Ordering::AcqRel);
request
})
}
}
fn try_push<T>(queue: &SegQueue<T>, len: &AtomicUsize, capacity: usize, item: T) -> bool {
let queue_size = len.fetch_add(1, Ordering::AcqRel);
if queue_size >= capacity {
len.fetch_sub(1, Ordering::AcqRel);
false
} else {
queue.push(item);
true
}
}
#[cfg(test)]
mod tests {
use super::{TimerBuilder, TimeoutStatus};
use wheel::HashedWheelBuilder;
use std::time::{Duration};
use std::sync::Arc;
use std::mem;
use std::thread;
use futures::{Future, Stream};
use futures::sync::mpsc::{self, UnboundedReceiver};
use futures::future;
#[test]
fn positive_thread_shutdown_with_dropped_timer() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default()
.with_tick_duration(Duration::from_millis(10000))
.build()
);
assert_eq!(2, Arc::strong_count(&timer.inner));
let weak = Arc::downgrade(&timer.inner);
thread::sleep(Duration::from_millis(50));
mem::drop(timer);
thread::sleep(Duration::from_millis(50));
assert!(weak.upgrade().is_none());
}
#[test]
fn positive_thread_shutdown_with_dropped_clone() {
let timer_one = TimerBuilder::default()
.build(HashedWheelBuilder::default()
.with_tick_duration(Duration::from_millis(10000))
.build()
);
let timer_two = timer_one.clone();
assert_eq!(3, Arc::strong_count(&timer_one.inner));
let weak = Arc::downgrade(&timer_one.inner);
thread::sleep(Duration::from_millis(50));
mem::drop(timer_one);
mem::drop(timer_two);
thread::sleep(Duration::from_millis(50));
assert!(weak.upgrade().is_none());
}
#[test]
fn positive_thread_shutdown_with_dropped_sleep() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let sleep = timer.sleep(Duration::from_millis(100))
.unwrap();
let weak = Arc::downgrade(&timer.inner);
thread::sleep(Duration::from_millis(50));
mem::drop(timer);
mem::drop(sleep);
thread::sleep(Duration::from_millis(50));
assert!(weak.upgrade().is_none());
}
#[test]
fn positive_sleep_wakes_on_milli() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let sleep = timer
.sleep(Duration::from_millis(50))
.unwrap();
sleep.wait().unwrap();
}
#[test]
fn positive_sleep_wakes_on_nano() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let sleep = timer
.sleep(Duration::new(0, 1))
.unwrap();
sleep.wait().unwrap();
}
#[test]
fn positive_sleep_wakes_on_zero() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let sleep = timer
.sleep(Duration::new(0, 0))
.unwrap();
sleep.wait().unwrap();
}
#[test]
fn positive_sleep_stream_yields_twice() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let mut stream = timer
.sleep_stream(Duration::from_millis(50))
.unwrap()
.wait();
stream.next().unwrap().unwrap();
stream.next().unwrap().unwrap();
}
#[test]
fn positive_heartbeat_sends_timeout() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let (_send, recv): (_, UnboundedReceiver<TimeoutStatus<()>>) = mpsc::unbounded();
let mut stream = timer
.heartbeat(Duration::from_millis(50), recv)
.unwrap()
.wait();
assert_eq!(TimeoutStatus::TimedOut, stream.next().unwrap().unwrap());
assert_eq!(TimeoutStatus::TimedOut, stream.next().unwrap().unwrap());
}
#[test]
fn positive_heartbeat_sends_item() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let (send, recv): (_, UnboundedReceiver<TimeoutStatus<()>>) = mpsc::unbounded();
send.unbounded_send(TimeoutStatus::Original(())).unwrap();
send.unbounded_send(TimeoutStatus::Original(())).unwrap();
let mut stream = timer
.heartbeat(Duration::from_millis(50), recv)
.unwrap()
.wait();
assert_eq!(TimeoutStatus::Original(()), stream.next().unwrap().unwrap());
assert_eq!(TimeoutStatus::Original(()), stream.next().unwrap().unwrap());
}
#[test]
fn positive_heartbeat_send_item_and_timeout() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let (send, recv): (_, UnboundedReceiver<TimeoutStatus<()>>) = mpsc::unbounded();
send.unbounded_send(TimeoutStatus::Original(())).unwrap();
send.unbounded_send(TimeoutStatus::Original(())).unwrap();
let mut stream = timer
.heartbeat(Duration::from_millis(50), recv)
.unwrap()
.wait();
assert_eq!(TimeoutStatus::Original(()), stream.next().unwrap().unwrap());
assert_eq!(TimeoutStatus::Original(()), stream.next().unwrap().unwrap());
assert_eq!(TimeoutStatus::TimedOut, stream.next().unwrap().unwrap());
}
#[test]
fn positive_timeout_times_out() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let result = timer
.timeout(Duration::from_millis(50), future::empty::<(), TimeoutStatus<()>>())
.unwrap()
.wait();
assert_eq!(TimeoutStatus::TimedOut, result.unwrap_err());
}
#[test]
fn positive_timeout_stream_times_out() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default().build());
let (_send, recv): (_, UnboundedReceiver<()>) = mpsc::unbounded();
let mut stream = timer
.timeout_stream(Duration::from_millis(50), recv.map_err(TimeoutStatus::Original))
.unwrap()
.wait();
assert_eq!(TimeoutStatus::TimedOut, stream.next().unwrap().unwrap_err());
assert_eq!(TimeoutStatus::TimedOut, stream.next().unwrap().unwrap_err());
}
#[test]
fn negative_thread_shutdown_with_existing_sleep() {
let timer = TimerBuilder::default()
.build(HashedWheelBuilder::default()
.with_tick_duration(Duration::from_millis(10000))
.build()
);
let sleep = timer.sleep(Duration::from_millis(100))
.unwrap();
assert_eq!(3, Arc::strong_count(&timer.inner));
thread::sleep(Duration::from_millis(50));
mem::drop(timer);
thread::sleep(Duration::from_millis(50));
assert_eq!(2, Arc::strong_count(&sleep.futures.inner));
}
}