use std::collections::HashMap;
use std::future::Future;
use std::hash::Hash;
use std::mem::replace;
use std::task::{Context, Poll};
use futures::future::poll_fn;
use tokio::time::delay_queue::Key as DelayQueueKey;
use tokio::time::{DelayQueue, Error, Instant};
enum InternalDataRef<Data> {
Owned(Box<Data>),
References,
}
pub(crate) enum EnqueuedCommit<Key, Data> {
Owned(Key, Data, Instant),
References(Key, Instant),
}
impl<Key, Data> EnqueuedCommit<Key, Data> {
pub fn at(&self) -> Instant {
match self {
EnqueuedCommit::Owned(_, _, at) => *at,
EnqueuedCommit::References(_, at) => *at,
}
}
pub fn key(&self) -> &Key {
match self {
EnqueuedCommit::Owned(key, _, _) => key,
EnqueuedCommit::References(key, _) => key,
}
}
pub fn into_key(self) -> Key {
match self {
EnqueuedCommit::Owned(key, _, _) => key,
EnqueuedCommit::References(key, _) => key,
}
}
pub fn data(&self) -> Option<&Data> {
match self {
EnqueuedCommit::Owned(_, data, _) => Some(data),
EnqueuedCommit::References(_, _) => None,
}
}
pub fn into_data(self) -> Option<Data> {
match self {
EnqueuedCommit::Owned(_, data, _) => Some(data),
EnqueuedCommit::References(_, _) => None,
}
}
pub fn into_inner(self) -> (Key, Option<Data>) {
match self {
EnqueuedCommit::Owned(key, data, _) => (key, Some(data)),
EnqueuedCommit::References(key, _) => (key, None),
}
}
}
struct InternalEnqueuedPersist<Data> {
delay_queue_key: DelayQueueKey,
internal_data_ref: InternalDataRef<Data>,
at: Instant,
}
impl<Data> InternalEnqueuedPersist<Data> {
#[inline]
fn into_enqueued_persist<Key>(self, key: Key) -> EnqueuedCommit<Key, Data> {
match self.internal_data_ref {
InternalDataRef::References => EnqueuedCommit::References(key, self.at),
InternalDataRef::Owned(data) => EnqueuedCommit::Owned(key, *data, self.at),
}
}
}
pub(crate) struct CommitQueue<Key, Data> {
delay_queue: DelayQueue<Key>,
delays_by_key: HashMap<Key, InternalEnqueuedPersist<Data>>,
}
pub(crate) enum PersistResult {
Enqueued,
ReEnqueued(Instant),
AlreadyEnqueued(Instant),
}
pub(crate) enum GiveOwnershipResult<Key, Data> {
NotEnqueued(Key, Data),
AlreadyOwned(Key, Data),
Transferred,
}
pub(crate) enum TakeOwnershipResult<Data> {
NotEnqueued,
Transferred(Data),
NotOwned,
}
impl<Key: Clone + Hash + Eq, Data> CommitQueue<Key, Data> {
pub(crate) fn new() -> Self {
Self::with_capacity(0)
}
pub(crate) fn with_capacity(capacity: usize) -> Self {
Self {
delay_queue: DelayQueue::with_capacity(capacity),
delays_by_key: HashMap::with_capacity(capacity),
}
}
#[inline]
pub(crate) fn len(&self) -> usize {
self.delay_queue.len()
}
#[inline]
pub(crate) fn get_persist_at(&self, key: &Key) -> Option<Instant> {
self.delays_by_key
.get(key)
.map(|enqueued_persist| enqueued_persist.at)
}
pub(crate) fn persist_at(&mut self, key: &Key, deadline: Instant) -> PersistResult {
match self.delays_by_key.get_mut(key) {
Some(internal_enqueued_persist) if deadline < internal_enqueued_persist.at => {
self.delay_queue
.reset_at(&internal_enqueued_persist.delay_queue_key, deadline);
let prev_deadline = internal_enqueued_persist.at;
internal_enqueued_persist.at = deadline;
PersistResult::ReEnqueued(prev_deadline)
}
Some(internal_enqueued_persist) => {
PersistResult::AlreadyEnqueued(internal_enqueued_persist.at)
}
None => {
let delay_queue_key = self.delay_queue.insert_at(key.clone(), deadline);
self.delays_by_key.insert(
key.clone(),
InternalEnqueuedPersist {
at: deadline,
delay_queue_key,
internal_data_ref: InternalDataRef::References,
},
);
PersistResult::Enqueued
}
}
}
pub(crate) fn give_ownership(
&mut self,
key: Key,
data: Data,
) -> GiveOwnershipResult<Key, Data> {
if let Some(enqueued_persist) = self.delays_by_key.get_mut(&key) {
match &mut enqueued_persist.internal_data_ref {
references @ InternalDataRef::References => {
*references = InternalDataRef::Owned(Box::new(data));
GiveOwnershipResult::Transferred
}
InternalDataRef::Owned(_) => GiveOwnershipResult::AlreadyOwned(key, data),
}
} else {
GiveOwnershipResult::NotEnqueued(key, data)
}
}
pub(crate) fn take_ownership(&mut self, key: &Key) -> TakeOwnershipResult<Data> {
if let Some(enqueued_persist) = self.delays_by_key.get_mut(&key) {
let prev_data_ref = replace(
&mut enqueued_persist.internal_data_ref,
InternalDataRef::References,
);
match prev_data_ref {
InternalDataRef::Owned(data) => TakeOwnershipResult::Transferred(*data),
InternalDataRef::References => TakeOwnershipResult::NotOwned,
}
} else {
TakeOwnershipResult::NotEnqueued
}
}
pub(crate) fn cancel(&mut self, key: &Key) -> Option<EnqueuedCommit<Key, Data>> {
match self.delays_by_key.remove_entry(key) {
None => None,
Some((key, internal_enqueued_persist)) => {
self.delay_queue
.remove(&internal_enqueued_persist.delay_queue_key);
Some(internal_enqueued_persist.into_enqueued_persist(key))
}
}
}
#[inline]
pub(crate) fn poll_ready(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<EnqueuedCommit<Key, Data>, Error>> {
match self.delay_queue.poll_expired(cx) {
Poll::Ready(Some(entry)) => {
let key = entry?.into_inner();
let internal_enqueued_persist = self.delays_by_key.remove(&key).expect(
"data inconsistency: poll_ready returned a key not contained in delays_by_key",
);
Poll::Ready(Ok(internal_enqueued_persist.into_enqueued_persist(key)))
}
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
}
#[inline]
pub(crate) fn next<'a>(
&'a mut self,
) -> impl Future<Output = Result<EnqueuedCommit<Key, Data>, Error>> + 'a {
poll_fn(move |cx| self.poll_ready(cx))
}
pub(crate) fn consume(self) -> Vec<EnqueuedCommit<Key, Data>> {
self.delays_by_key
.into_iter()
.map(|(key, internal_enqueued_persist)| {
internal_enqueued_persist.into_enqueued_persist(key)
})
.collect()
}
}
#[inline]
pub(crate) fn poll_option_ready<'a, Key: Clone + Hash + Eq, Data>(
mut commit_queue: Option<&'a mut CommitQueue<Key, Data>>,
) -> impl Future<Output = Result<EnqueuedCommit<Key, Data>, Error>> + 'a {
poll_fn(move |cx| {
commit_queue
.as_mut()
.map(|queue| queue.poll_ready(cx))
.unwrap_or(Poll::Pending)
})
}
#[cfg(test)]
mod test {
use super::*;
use tokio::time::{Duration, Instant};
struct Data;
#[tokio::test]
async fn test_persist_queue_persist_at() {
let mut queue: CommitQueue<u32, Data> = CommitQueue::new();
let now = Instant::now();
match queue.persist_at(&1, now + Duration::from_secs(2)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
assert_eq!(queue.get_persist_at(&1), Some(now + Duration::from_secs(2)));
match queue.persist_at(&1, now + Duration::from_secs(3)) {
PersistResult::AlreadyEnqueued(when) => {
assert_eq!(when, now + Duration::from_secs(2));
}
_ => unreachable!(),
}
assert_eq!(queue.get_persist_at(&1), Some(now + Duration::from_secs(2)));
match queue.persist_at(&1, now + Duration::from_secs(1)) {
PersistResult::ReEnqueued(when) => {
assert_eq!(when, now + Duration::from_secs(2));
}
_ => unreachable!(),
}
assert_eq!(queue.get_persist_at(&1), Some(now + Duration::from_secs(1)));
assert_eq!(queue.len(), 1);
let data_ref = queue.cancel(&1).unwrap();
assert_eq!(data_ref.at(), now + Duration::from_secs(1));
assert!(queue.cancel(&1).is_none());
assert!(queue.get_persist_at(&1).is_none());
assert_eq!(queue.len(), 0);
}
#[tokio::test]
async fn test_next() {
let mut queue: CommitQueue<u32, Data> = CommitQueue::new();
let now = Instant::now();
match queue.persist_at(&1, now + Duration::from_millis(50)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.persist_at(&3, now + Duration::from_millis(150)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.persist_at(&2, now + Duration::from_millis(100)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
assert_eq!(queue.len(), 3);
assert_eq!(queue.next().await.unwrap().into_key(), 1);
assert_eq!(queue.len(), 2);
assert_eq!(queue.next().await.unwrap().into_key(), 2);
assert_eq!(queue.len(), 1);
assert_eq!(queue.next().await.unwrap().into_key(), 3);
assert_eq!(queue.len(), 0);
let mut task = tokio_test::task::spawn(queue.next());
match task.poll() {
std::task::Poll::Pending => {}
_ => panic!("poll was ready?"),
};
}
#[tokio::test]
async fn test_consume() {
let mut queue: CommitQueue<u32, Data> = CommitQueue::new();
let now = Instant::now();
match queue.persist_at(&1, now + Duration::from_millis(50)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.persist_at(&3, now + Duration::from_millis(150)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.persist_at(&2, now + Duration::from_millis(100)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
let mut consumed = queue.consume();
assert_eq!(consumed.len(), 3);
consumed.sort_by_key(|k| k.at().into_std());
assert_eq!(
consumed.iter().map(|k| k.at()).collect::<Vec<_>>(),
vec![
now + Duration::from_millis(50),
now + Duration::from_millis(100),
now + Duration::from_millis(150)
]
);
assert_eq!(
consumed
.into_iter()
.map(|k| k.into_key())
.collect::<Vec<_>>(),
vec![1, 2, 3]
);
}
#[tokio::test]
async fn test_ownership() {
let mut queue: CommitQueue<u32, Data> = CommitQueue::new();
match queue.give_ownership(1, Data {}) {
GiveOwnershipResult::NotEnqueued(_, _) => {}
_ => unreachable!(),
}
match queue.take_ownership(&1) {
TakeOwnershipResult::NotEnqueued => {}
_ => unreachable!(),
}
let now = Instant::now();
match queue.persist_at(&1, now + Duration::from_millis(50)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.take_ownership(&1) {
TakeOwnershipResult::NotOwned => {}
_ => unreachable!(),
}
match queue.give_ownership(1, Data {}) {
GiveOwnershipResult::Transferred => {}
_ => unreachable!(),
}
match queue.give_ownership(1, Data {}) {
GiveOwnershipResult::AlreadyOwned(_, _) => {}
_ => unreachable!(),
}
match queue.take_ownership(&1) {
TakeOwnershipResult::Transferred(_) => {}
_ => unreachable!(),
}
match queue.take_ownership(&1) {
TakeOwnershipResult::NotOwned => {}
_ => unreachable!(),
}
let item = queue.next().await.unwrap();
assert_eq!(item.key(), &1);
assert!(item.data().is_none());
match queue.persist_at(&2, now + Duration::from_millis(50)) {
PersistResult::Enqueued => {}
_ => unreachable!(),
}
match queue.give_ownership(2, Data {}) {
GiveOwnershipResult::Transferred => {}
_ => unreachable!(),
}
let item = queue.next().await.unwrap();
assert_eq!(item.key(), &2);
assert!(item.data().is_some());
}
}