use crate::timer::clock::now;
use crate::timer::driver::Handle;
use crate::timer::wheel::{self, Wheel};
use crate::timer::{Delay, Error};
use futures_core::ready;
use slab::Slab;
use std::cmp;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct DelayQueue<T> {
handle: Handle,
slab: Slab<Data<T>>,
wheel: Wheel<Stack<T>>,
expired: Stack<T>,
delay: Option<Delay>,
poll: wheel::Poll,
start: Instant,
}
#[derive(Debug)]
pub struct Expired<T> {
data: T,
deadline: Instant,
key: Key,
}
#[derive(Debug, Clone)]
pub struct Key {
index: usize,
}
#[derive(Debug)]
struct Stack<T> {
head: Option<usize>,
_p: PhantomData<fn() -> T>,
}
#[derive(Debug)]
struct Data<T> {
inner: T,
when: u64,
expired: bool,
next: Option<usize>,
prev: Option<usize>,
}
const MAX_ENTRIES: usize = (1 << 30) - 1;
impl<T> DelayQueue<T> {
pub fn new() -> DelayQueue<T> {
DelayQueue::with_capacity(0)
}
pub fn with_capacity_and_handle(capacity: usize, handle: &Handle) -> DelayQueue<T> {
DelayQueue {
handle: handle.clone(),
wheel: Wheel::new(),
slab: Slab::with_capacity(capacity),
expired: Stack::default(),
delay: None,
poll: wheel::Poll::new(0),
start: now(),
}
}
pub fn with_capacity(capacity: usize) -> DelayQueue<T> {
DelayQueue::with_capacity_and_handle(capacity, &Handle::default())
}
pub fn insert_at(&mut self, value: T, when: Instant) -> Key {
assert!(self.slab.len() < MAX_ENTRIES, "max entries exceeded");
let when = self.normalize_deadline(when);
let key = self.slab.insert(Data {
inner: value,
when,
expired: false,
next: None,
prev: None,
});
self.insert_idx(when, key);
let should_set_delay = if let Some(ref delay) = self.delay {
let current_exp = self.normalize_deadline(delay.deadline());
current_exp > when
} else {
true
};
if should_set_delay {
self.delay = Some(self.handle.delay(self.start + Duration::from_millis(when)));
}
Key::new(key)
}
pub fn poll_next(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Expired<T>, Error>>> {
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
let data = self.slab.remove(idx);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());
Expired {
key: Key::new(idx),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
})
}))
}
pub fn insert(&mut self, value: T, timeout: Duration) -> Key {
self.insert_at(value, now() + timeout)
}
fn insert_idx(&mut self, when: u64, key: usize) {
use self::wheel::{InsertError, Stack};
match self.wheel.insert(when, key, &mut self.slab) {
Ok(_) => {}
Err((_, InsertError::Elapsed)) => {
self.slab[key].expired = true;
self.expired.push(key, &mut self.slab);
}
Err((_, err)) => panic!("invalid deadline; err={:?}", err),
}
}
pub fn remove(&mut self, key: &Key) -> Expired<T> {
use crate::timer::wheel::Stack;
if self.slab[key.index].expired {
self.expired.remove(&key.index, &mut self.slab);
} else {
self.wheel.remove(&key.index, &mut self.slab);
}
let data = self.slab.remove(key.index);
Expired {
key: Key::new(key.index),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
}
pub fn reset_at(&mut self, key: &Key, when: Instant) {
self.wheel.remove(&key.index, &mut self.slab);
let when = self.normalize_deadline(when);
self.slab[key.index].when = when;
self.insert_idx(when, key.index);
let next_deadline = self.next_deadline();
if let (Some(ref mut delay), Some(deadline)) = (&mut self.delay, next_deadline) {
delay.reset(deadline);
}
}
fn next_deadline(&mut self) -> Option<Instant> {
self.wheel
.poll_at()
.map(|poll_at| self.start + Duration::from_millis(poll_at))
}
pub fn reset(&mut self, key: &Key, timeout: Duration) {
self.reset_at(key, now() + timeout);
}
pub fn clear(&mut self) {
self.slab.clear();
self.expired = Stack::default();
self.wheel = Wheel::new();
self.delay = None;
}
pub fn capacity(&self) -> usize {
self.slab.capacity()
}
pub fn reserve(&mut self, additional: usize) {
self.slab.reserve(additional);
}
pub fn is_empty(&self) -> bool {
self.slab.is_empty()
}
fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> {
use self::wheel::Stack;
let expired = self.expired.pop(&mut self.slab);
if expired.is_some() {
return Poll::Ready(expired.map(Ok));
}
loop {
if let Some(ref mut delay) = self.delay {
if !delay.is_elapsed() {
ready!(Pin::new(&mut *delay).poll(cx));
}
let now =
crate::timer::ms(delay.deadline() - self.start, crate::timer::Round::Down);
self.poll = wheel::Poll::new(now);
}
self.delay = None;
if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) {
return Poll::Ready(Some(Ok(idx)));
}
if let Some(deadline) = self.next_deadline() {
self.delay = Some(self.handle.delay(deadline));
} else {
return Poll::Ready(None);
}
}
}
fn normalize_deadline(&self, when: Instant) -> u64 {
let when = if when < self.start {
0
} else {
crate::timer::ms(when - self.start, crate::timer::Round::Up)
};
cmp::max(when, self.wheel.elapsed())
}
}
impl<T> Unpin for DelayQueue<T> {}
impl<T> futures_core::Stream for DelayQueue<T> {
type Item = Result<Expired<T>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
DelayQueue::poll_next(self.get_mut(), cx)
}
}
impl<T> Default for DelayQueue<T> {
fn default() -> DelayQueue<T> {
DelayQueue::new()
}
}
impl<T> wheel::Stack for Stack<T> {
type Owned = usize;
type Borrowed = usize;
type Store = Slab<Data<T>>;
fn is_empty(&self) -> bool {
self.head.is_none()
}
fn push(&mut self, item: Self::Owned, store: &mut Self::Store) {
debug_assert!(store[item].next.is_none());
debug_assert!(store[item].prev.is_none());
let old = self.head.take();
if let Some(idx) = old {
store[idx].prev = Some(item);
}
store[item].next = old;
self.head = Some(item)
}
fn pop(&mut self, store: &mut Self::Store) -> Option<Self::Owned> {
if let Some(idx) = self.head {
self.head = store[idx].next;
if let Some(idx) = self.head {
store[idx].prev = None;
}
store[idx].next = None;
debug_assert!(store[idx].prev.is_none());
Some(idx)
} else {
None
}
}
fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store) {
assert!(store.contains(*item));
debug_assert!({
let mut next = self.head;
let mut contains = false;
while let Some(idx) = next {
if idx == *item {
debug_assert!(!contains);
contains = true;
}
next = store[idx].next;
}
contains
});
if let Some(next) = store[*item].next {
store[next].prev = store[*item].prev;
}
if let Some(prev) = store[*item].prev {
store[prev].next = store[*item].next;
} else {
self.head = store[*item].next;
}
store[*item].next = None;
store[*item].prev = None;
}
fn when(item: &Self::Borrowed, store: &Self::Store) -> u64 {
store[*item].when
}
}
impl<T> Default for Stack<T> {
fn default() -> Stack<T> {
Stack {
head: None,
_p: PhantomData,
}
}
}
impl Key {
pub(crate) fn new(index: usize) -> Key {
Key { index }
}
}
impl<T> Expired<T> {
pub fn get_ref(&self) -> &T {
&self.data
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.data
}
pub fn into_inner(self) -> T {
self.data
}
}