use crate::atomic::{Atomic, AtomicU64};
use crate::bee::Worker;
use crate::hive::Task;
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct RetryQueue<W: Worker> {
inner: UnsafeCell<BinaryHeap<DelayedTask<W>>>,
delay_factor: AtomicU64,
}
impl<W: Worker> RetryQueue<W> {
pub fn new(delay_factor: u64) -> Self {
Self {
inner: UnsafeCell::new(BinaryHeap::new()),
delay_factor: AtomicU64::new(delay_factor),
}
}
pub fn set_delay_factor(&self, delay_factor: u64) {
self.delay_factor.set(delay_factor);
}
pub fn try_push(&self, task: Task<W>) -> Result<Instant, Task<W>> {
unsafe {
match self.inner.get().as_mut() {
Some(queue) => {
let delay = 2u64
.checked_pow(task.meta.attempt() as u32 - 1)
.and_then(|multiplier| {
self.delay_factor
.get()
.checked_mul(multiplier)
.or(Some(u64::MAX))
.map(Duration::from_nanos)
})
.unwrap_or_default();
let delayed = DelayedTask::new(task, delay);
let until = delayed.until;
queue.push(delayed);
Ok(until)
}
None => Err(task),
}
}
}
pub fn try_pop(&self) -> Option<Task<W>> {
unsafe {
let queue_ptr = self.inner.get();
if queue_ptr
.as_ref()
.and_then(|queue| queue.peek())
.map(|head| head.until <= Instant::now())
.unwrap_or(false)
{
queue_ptr
.as_mut()
.and_then(|queue| queue.pop())
.map(|delayed| delayed.value)
} else {
None
}
}
}
pub fn drain_into(self, sink: &mut Vec<Task<W>>) {
let mut queue = self.inner.into_inner();
sink.reserve(queue.len());
sink.extend(queue.drain().map(|delayed| delayed.value))
}
}
unsafe impl<W: Worker> Sync for RetryQueue<W> {}
struct DelayedTask<W: Worker> {
value: Task<W>,
until: Instant,
}
impl<W: Worker> DelayedTask<W> {
pub fn new(value: Task<W>, delay: Duration) -> Self {
Self {
value,
until: Instant::now() + delay,
}
}
}
impl<W: Worker> Ord for DelayedTask<W> {
fn cmp(&self, other: &DelayedTask<W>) -> Ordering {
other.until.cmp(&self.until)
}
}
impl<W: Worker> PartialOrd for DelayedTask<W> {
fn partial_cmp(&self, other: &DelayedTask<W>) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<W: Worker> PartialEq for DelayedTask<W> {
fn eq(&self, other: &DelayedTask<W>) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<W: Worker> Eq for DelayedTask<W> {}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::{RetryQueue, Task, Worker};
use crate::bee::stock::EchoWorker;
use crate::bee::{TaskId, TaskMeta};
use std::{thread, time::Duration};
type TestWorker = EchoWorker<usize>;
const DELAY: u64 = Duration::from_secs(1).as_nanos() as u64;
impl<W: Worker> RetryQueue<W> {
fn len(&self) -> usize {
unsafe { self.inner.get().as_ref().unwrap().len() }
}
}
impl<W: Worker> Task<W> {
fn with_attempt(task_id: TaskId, input: W::Input, attempt: u8) -> Self {
Self {
input,
meta: TaskMeta::with_attempt(task_id, attempt),
outcome_tx: None,
}
}
}
#[test]
fn test_works() {
let queue = RetryQueue::<TestWorker>::new(DELAY);
let task1 = Task::with_attempt(1, 1, 1);
let task2 = Task::with_attempt(2, 2, 2);
let task3 = Task::with_attempt(3, 3, 3);
queue.try_push(task1.clone()).unwrap();
queue.try_push(task2.clone()).unwrap();
queue.try_push(task3.clone()).unwrap();
assert_eq!(queue.len(), 3);
assert_eq!(queue.try_pop(), None);
thread::sleep(Duration::from_secs(1));
assert_eq!(queue.try_pop(), Some(task1));
assert_eq!(queue.len(), 2);
thread::sleep(Duration::from_secs(1));
assert_eq!(queue.try_pop(), Some(task2));
assert_eq!(queue.len(), 1);
thread::sleep(Duration::from_secs(2));
assert_eq!(queue.try_pop(), Some(task3));
assert_eq!(queue.len(), 0);
assert_eq!(queue.try_pop(), None);
}
#[test]
fn test_into_vec() {
let queue = RetryQueue::<TestWorker>::new(DELAY);
let task1 = Task::with_attempt(1, 1, 1);
let task2 = Task::with_attempt(2, 2, 2);
let task3 = Task::with_attempt(3, 3, 3);
queue.try_push(task1.clone()).unwrap();
queue.try_push(task2.clone()).unwrap();
queue.try_push(task3.clone()).unwrap();
let mut v = Vec::new();
queue.drain_into(&mut v);
v.sort();
assert_eq!(v, vec![task1, task2, task3]);
}
}