#[cfg(feature = "google-books")]
use std::cmp::Reverse;
#[cfg(feature = "google-books")]
use std::collections::BinaryHeap;
#[cfg(feature = "google-books")]
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
#[cfg(feature = "google-books")]
use std::sync::Arc;
#[cfg(feature = "google-books")]
use std::time::{Duration, Instant};
#[cfg(feature = "google-books")]
use chrono::{DateTime, Utc};
#[cfg(feature = "google-books")]
use tokio::sync::{mpsc, watch, Mutex};
#[cfg(feature = "google-books")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RetryAfter {
Seconds(u64),
DateTime(DateTime<Utc>),
}
#[cfg(feature = "google-books")]
impl RetryAfter {
pub fn parse(value: &str) -> Option<Self> {
let value = value.trim();
if let Ok(seconds) = value.parse::<u64>() {
return Some(RetryAfter::Seconds(seconds));
}
if let Ok(dt) = DateTime::parse_from_rfc2822(value) {
return Some(RetryAfter::DateTime(dt.with_timezone(&Utc)));
}
if let Ok(dt) = DateTime::parse_from_rfc3339(value) {
return Some(RetryAfter::DateTime(dt.with_timezone(&Utc)));
}
None
}
pub fn to_duration(&self) -> Duration {
match self {
RetryAfter::Seconds(s) => Duration::from_secs(*s),
RetryAfter::DateTime(dt) => {
let now = Utc::now();
if *dt > now {
let diff = (*dt - now).num_milliseconds();
Duration::from_millis(diff.max(0) as u64)
} else {
Duration::ZERO
}
}
}
}
pub fn to_instant(&self) -> Instant {
Instant::now() + self.to_duration()
}
}
#[cfg(feature = "google-books")]
pub const MAX_RETRIES: u8 = 5;
#[cfg(feature = "google-books")]
pub const INITIAL_BACKOFF_MS: u64 = 1000;
#[cfg(feature = "google-books")]
#[derive(Clone, Debug)]
pub struct Job {
pub url: Arc<str>,
pub prefix: Arc<str>,
pub order: u8,
pub attempt: u8,
pub backoff_ms: u64,
}
#[cfg(feature = "google-books")]
impl Job {
pub fn new(url: impl Into<Arc<str>>, prefix: impl Into<Arc<str>>, order: u8) -> Self {
Self {
url: url.into(),
prefix: prefix.into(),
order,
attempt: 0,
backoff_ms: INITIAL_BACKOFF_MS,
}
}
pub fn with_retry(&self) -> Self {
Self {
url: Arc::clone(&self.url),
prefix: Arc::clone(&self.prefix),
order: self.order,
attempt: self.attempt + 1,
backoff_ms: self.backoff_ms.saturating_mul(2),
}
}
pub fn can_retry(&self) -> bool {
self.attempt < MAX_RETRIES
}
}
#[cfg(feature = "google-books")]
struct RetryEntry {
ready_at: Instant,
job: Job,
}
#[cfg(feature = "google-books")]
impl PartialEq for RetryEntry {
fn eq(&self, other: &Self) -> bool {
self.ready_at == other.ready_at
}
}
#[cfg(feature = "google-books")]
impl Eq for RetryEntry {}
#[cfg(feature = "google-books")]
impl PartialOrd for RetryEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[cfg(feature = "google-books")]
impl Ord for RetryEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.ready_at.cmp(&other.ready_at)
}
}
#[cfg(feature = "google-books")]
struct RetryQueue {
heap: BinaryHeap<Reverse<RetryEntry>>,
}
#[cfg(feature = "google-books")]
impl RetryQueue {
fn new() -> Self {
Self {
heap: BinaryHeap::new(),
}
}
fn push(&mut self, job: Job, ready_at: Instant) {
self.heap.push(Reverse(RetryEntry { ready_at, job }));
}
fn peek_ready_at(&self) -> Option<Instant> {
self.heap.peek().map(|Reverse(entry)| entry.ready_at)
}
fn pop_if_ready(&mut self) -> Option<Job> {
if let Some(Reverse(entry)) = self.heap.peek() {
if entry.ready_at <= Instant::now() {
return self.heap.pop().map(|Reverse(e)| e.job);
}
}
None
}
fn len(&self) -> usize {
self.heap.len()
}
fn is_empty(&self) -> bool {
self.heap.is_empty()
}
fn time_until_next(&self) -> Option<Duration> {
self.peek_ready_at()
.and_then(|ready_at| ready_at.checked_duration_since(Instant::now()))
}
}
#[cfg(feature = "google-books")]
#[derive(Debug, Clone)]
pub struct TaskManagerConfig {
pub max_retries: u8,
pub initial_backoff_ms: u64,
pub max_backoff_ms: u64,
pub jitter_factor: f64,
}
#[cfg(feature = "google-books")]
impl Default for TaskManagerConfig {
fn default() -> Self {
Self {
max_retries: 5,
initial_backoff_ms: 1000,
max_backoff_ms: 60_000,
jitter_factor: 0.25,
}
}
}
#[cfg(feature = "google-books")]
impl TaskManagerConfig {
pub fn new(
max_retries: u8,
initial_backoff_ms: u64,
max_backoff_ms: u64,
jitter_factor: f64,
) -> Self {
Self {
max_retries,
initial_backoff_ms,
max_backoff_ms,
jitter_factor: jitter_factor.clamp(0.0, 1.0),
}
}
pub fn compute_backoff(&self, attempt: u8) -> Duration {
let base = self
.initial_backoff_ms
.saturating_mul(1u64 << attempt.min(10));
let capped = base.min(self.max_backoff_ms);
let jitter_range = (capped as f64) * self.jitter_factor;
let jitter = jitter_range * (rand::random::<f64>() * 2.0 - 1.0);
let with_jitter = (capped as f64 + jitter).max(0.0) as u64;
Duration::from_millis(with_jitter)
}
}
#[cfg(feature = "google-books")]
#[derive(Debug)]
pub struct TaskManagerMetrics {
pub regular_queue_depth: AtomicUsize,
pub retry_queue_depth: AtomicUsize,
pub total_retries: AtomicU64,
pub total_failures: AtomicU64,
pub total_successes: AtomicU64,
}
#[cfg(feature = "google-books")]
impl Default for TaskManagerMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "google-books")]
impl TaskManagerMetrics {
pub fn new() -> Self {
Self {
regular_queue_depth: AtomicUsize::new(0),
retry_queue_depth: AtomicUsize::new(0),
total_retries: AtomicU64::new(0),
total_failures: AtomicU64::new(0),
total_successes: AtomicU64::new(0),
}
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
regular_queue_depth: self.regular_queue_depth.load(Ordering::Relaxed),
retry_queue_depth: self.retry_queue_depth.load(Ordering::Relaxed),
total_retries: self.total_retries.load(Ordering::Relaxed),
total_failures: self.total_failures.load(Ordering::Relaxed),
total_successes: self.total_successes.load(Ordering::Relaxed),
}
}
}
#[cfg(feature = "google-books")]
#[derive(Debug, Clone, Copy)]
pub struct MetricsSnapshot {
pub regular_queue_depth: usize,
pub retry_queue_depth: usize,
pub total_retries: u64,
pub total_failures: u64,
pub total_successes: u64,
}
#[cfg(feature = "google-books")]
pub struct TaskManager {
regular_rx: Mutex<mpsc::Receiver<Job>>,
retry_queue: Mutex<RetryQueue>,
shutdown: watch::Receiver<bool>,
metrics: Arc<TaskManagerMetrics>,
config: TaskManagerConfig,
}
#[cfg(feature = "google-books")]
impl TaskManager {
pub fn new(
capacity: usize,
config: TaskManagerConfig,
shutdown: watch::Receiver<bool>,
) -> (Self, TaskSubmitter) {
let (regular_tx, regular_rx) = mpsc::channel(capacity);
let metrics = Arc::new(TaskManagerMetrics::new());
let manager = Self {
regular_rx: Mutex::new(regular_rx),
retry_queue: Mutex::new(RetryQueue::new()),
shutdown,
metrics: Arc::clone(&metrics),
config,
};
let submitter = TaskSubmitter {
regular_tx,
metrics,
};
(manager, submitter)
}
pub async fn get_next_task(&self) -> Option<Job> {
loop {
if *self.shutdown.borrow() {
return None;
}
{
let mut retry_queue = self.retry_queue.lock().await;
if let Some(job) = retry_queue.pop_if_ready() {
self.metrics
.retry_queue_depth
.store(retry_queue.len(), Ordering::Relaxed);
return Some(job);
}
}
let time_until_retry = {
let retry_queue = self.retry_queue.lock().await;
retry_queue.time_until_next()
};
let mut regular_rx = self.regular_rx.lock().await;
let mut shutdown_rx = self.shutdown.clone();
match time_until_retry {
Some(timeout) => {
tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *self.shutdown.borrow() {
return None;
}
}
result = tokio::time::timeout(timeout, regular_rx.recv()) => {
match result {
Ok(Some(job)) => {
self.metrics.regular_queue_depth.fetch_sub(1, Ordering::Relaxed);
return Some(job);
}
Ok(None) => {
drop(regular_rx);
let retry_queue = self.retry_queue.lock().await;
if retry_queue.is_empty() {
return None;
}
}
Err(_) => {
}
}
}
}
}
None => {
tokio::select! {
biased;
_ = shutdown_rx.changed() => {
if *self.shutdown.borrow() {
return None;
}
}
result = regular_rx.recv() => {
match result {
Some(job) => {
self.metrics.regular_queue_depth.fetch_sub(1, Ordering::Relaxed);
return Some(job);
}
None => {
return None;
}
}
}
}
}
}
}
}
pub async fn schedule_retry(&self, job: Job, retry_after: Option<RetryAfter>) {
let backoff = match retry_after {
Some(ra) => ra.to_duration(),
None => self.config.compute_backoff(job.attempt),
};
let ready_at = Instant::now() + backoff;
let mut retry_queue = self.retry_queue.lock().await;
retry_queue.push(job, ready_at);
self.metrics
.retry_queue_depth
.store(retry_queue.len(), Ordering::Relaxed);
self.metrics.total_retries.fetch_add(1, Ordering::Relaxed);
}
pub fn record_failure(&self) {
self.metrics.total_failures.fetch_add(1, Ordering::Relaxed);
}
pub fn record_success(&self) {
self.metrics.total_successes.fetch_add(1, Ordering::Relaxed);
}
pub async fn time_until_next_retry(&self) -> Option<Duration> {
let retry_queue = self.retry_queue.lock().await;
retry_queue.time_until_next().or_else(|| {
if !retry_queue.is_empty() {
Some(Duration::ZERO)
} else {
None
}
})
}
pub fn metrics(&self) -> &Arc<TaskManagerMetrics> {
&self.metrics
}
pub fn config(&self) -> &TaskManagerConfig {
&self.config
}
pub async fn is_empty(&self) -> bool {
let retry_empty = self.retry_queue.lock().await.is_empty();
let regular_depth = self.metrics.regular_queue_depth.load(Ordering::Relaxed);
retry_empty && regular_depth == 0
}
}
#[cfg(feature = "google-books")]
#[derive(Clone)]
pub struct TaskSubmitter {
regular_tx: mpsc::Sender<Job>,
metrics: Arc<TaskManagerMetrics>,
}
#[cfg(feature = "google-books")]
impl TaskSubmitter {
pub async fn submit(&self, job: Job) -> Result<(), Job> {
match self.regular_tx.send(job).await {
Ok(()) => {
self.metrics
.regular_queue_depth
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(e) => Err(e.0),
}
}
pub fn try_submit(&self, job: Job) -> Result<(), Job> {
match self.regular_tx.try_send(job) {
Ok(()) => {
self.metrics
.regular_queue_depth
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(mpsc::error::TrySendError::Full(job)) => Err(job),
Err(mpsc::error::TrySendError::Closed(job)) => Err(job),
}
}
pub fn metrics(&self) -> &Arc<TaskManagerMetrics> {
&self.metrics
}
pub fn is_closed(&self) -> bool {
self.regular_tx.is_closed()
}
}
#[cfg(all(test, feature = "google-books"))]
mod tests {
use super::*;
use chrono::Datelike;
#[test]
fn test_retry_after_parse_seconds() {
assert_eq!(RetryAfter::parse("120"), Some(RetryAfter::Seconds(120)));
assert_eq!(RetryAfter::parse("0"), Some(RetryAfter::Seconds(0)));
assert_eq!(RetryAfter::parse("3600"), Some(RetryAfter::Seconds(3600)));
assert_eq!(RetryAfter::parse(" 60 "), Some(RetryAfter::Seconds(60)));
}
#[test]
fn test_retry_after_parse_http_date() {
let result = RetryAfter::parse("Wed, 21 Oct 2015 07:28:00 GMT");
assert!(matches!(result, Some(RetryAfter::DateTime(_))));
if let Some(RetryAfter::DateTime(dt)) = result {
assert_eq!(dt.year(), 2015);
assert_eq!(dt.month(), 10);
assert_eq!(dt.day(), 21);
}
}
#[test]
fn test_retry_after_parse_rfc3339() {
let result = RetryAfter::parse("2024-01-15T10:30:00Z");
assert!(matches!(result, Some(RetryAfter::DateTime(_))));
}
#[test]
fn test_retry_after_parse_invalid() {
assert_eq!(RetryAfter::parse("invalid"), None);
assert_eq!(RetryAfter::parse(""), None);
assert_eq!(RetryAfter::parse("-100"), None);
}
#[test]
fn test_retry_after_to_duration_seconds() {
let ra = RetryAfter::Seconds(60);
let duration = ra.to_duration();
assert_eq!(duration, Duration::from_secs(60));
}
#[test]
fn test_retry_after_to_duration_past_datetime() {
let past = Utc::now() - chrono::Duration::hours(1);
let ra = RetryAfter::DateTime(past);
assert_eq!(ra.to_duration(), Duration::ZERO);
}
#[test]
fn test_config_default() {
let config = TaskManagerConfig::default();
assert_eq!(config.max_retries, 5);
assert_eq!(config.initial_backoff_ms, 1000);
assert_eq!(config.max_backoff_ms, 60_000);
assert!((config.jitter_factor - 0.25).abs() < f64::EPSILON);
}
#[test]
fn test_config_compute_backoff_exponential() {
let config = TaskManagerConfig {
max_retries: 5,
initial_backoff_ms: 1000,
max_backoff_ms: 60_000,
jitter_factor: 0.0, };
let d0 = config.compute_backoff(0);
assert_eq!(d0, Duration::from_millis(1000));
let d1 = config.compute_backoff(1);
assert_eq!(d1, Duration::from_millis(2000));
let d2 = config.compute_backoff(2);
assert_eq!(d2, Duration::from_millis(4000));
let d5 = config.compute_backoff(5);
assert_eq!(d5, Duration::from_millis(32000));
let d6 = config.compute_backoff(6);
assert_eq!(d6, Duration::from_millis(60000));
}
#[test]
fn test_config_compute_backoff_with_jitter() {
let config = TaskManagerConfig {
max_retries: 5,
initial_backoff_ms: 1000,
max_backoff_ms: 60_000,
jitter_factor: 0.25,
};
for _ in 0..100 {
let d = config.compute_backoff(0);
let ms = d.as_millis() as u64;
assert!(
(750..=1250).contains(&ms),
"Backoff {} outside expected range [750, 1250]",
ms
);
}
}
#[test]
fn test_job_new() {
let job = Job::new("https://example.com/file.gz", "th", 2);
assert_eq!(&*job.url, "https://example.com/file.gz");
assert_eq!(&*job.prefix, "th");
assert_eq!(job.order, 2);
assert_eq!(job.attempt, 0);
assert_eq!(job.backoff_ms, INITIAL_BACKOFF_MS);
}
#[test]
fn test_job_with_retry() {
let job = Job::new("https://example.com/file.gz", "th", 2);
let retry1 = job.with_retry();
assert_eq!(&*retry1.url, "https://example.com/file.gz");
assert_eq!(retry1.attempt, 1);
assert_eq!(retry1.backoff_ms, 2000);
let retry2 = retry1.with_retry();
assert_eq!(retry2.attempt, 2);
assert_eq!(retry2.backoff_ms, 4000);
}
#[test]
fn test_job_can_retry() {
let mut job = Job::new("https://example.com/file.gz", "th", 2);
assert!(job.can_retry());
for _ in 0..MAX_RETRIES {
job = job.with_retry();
}
assert!(!job.can_retry());
}
#[test]
fn test_retry_queue_ordering() {
let mut queue = RetryQueue::new();
let now = Instant::now();
let job1 = Job::new("url1", "a", 1);
let job2 = Job::new("url2", "b", 1);
let job3 = Job::new("url3", "c", 1);
queue.push(job3.clone(), now + Duration::from_secs(30));
queue.push(job1.clone(), now + Duration::from_secs(10));
queue.push(job2.clone(), now + Duration::from_secs(20));
assert_eq!(queue.len(), 3);
assert_eq!(queue.peek_ready_at(), Some(now + Duration::from_secs(10)));
}
#[test]
fn test_retry_queue_pop_if_ready() {
let mut queue = RetryQueue::new();
let now = Instant::now();
let job1 = Job::new("url1", "a", 1);
let job2 = Job::new("url2", "b", 1);
queue.push(job1.clone(), now - Duration::from_secs(1)); queue.push(job2.clone(), now + Duration::from_secs(60));
let popped = queue.pop_if_ready();
assert!(popped.is_some());
assert_eq!(&*popped.unwrap().prefix, "a");
let not_ready = queue.pop_if_ready();
assert!(not_ready.is_none());
assert_eq!(queue.len(), 1);
}
#[test]
fn test_metrics_snapshot() {
let metrics = TaskManagerMetrics::new();
metrics.regular_queue_depth.store(10, Ordering::Relaxed);
metrics.retry_queue_depth.store(5, Ordering::Relaxed);
metrics.total_retries.store(100, Ordering::Relaxed);
metrics.total_failures.store(3, Ordering::Relaxed);
metrics.total_successes.store(97, Ordering::Relaxed);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.regular_queue_depth, 10);
assert_eq!(snapshot.retry_queue_depth, 5);
assert_eq!(snapshot.total_retries, 100);
assert_eq!(snapshot.total_failures, 3);
assert_eq!(snapshot.total_successes, 97);
}
#[tokio::test]
async fn test_task_manager_basic_flow() {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let config = TaskManagerConfig::default();
let (manager, submitter) = TaskManager::new(100, config, shutdown_rx);
let job1 = Job::new("url1", "a", 1);
let job2 = Job::new("url2", "b", 1);
submitter.submit(job1).await.expect("submit job1");
submitter.submit(job2).await.expect("submit job2");
let got1 = manager.get_next_task().await;
assert!(got1.is_some());
assert_eq!(&*got1.unwrap().prefix, "a");
let got2 = manager.get_next_task().await;
assert!(got2.is_some());
assert_eq!(&*got2.unwrap().prefix, "b");
shutdown_tx.send(true).expect("send shutdown");
let got3 = manager.get_next_task().await;
assert!(got3.is_none());
}
#[tokio::test]
async fn test_task_manager_retry_priority() {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let config = TaskManagerConfig::default();
let (manager, submitter) = TaskManager::new(100, config, shutdown_rx);
let regular_job = Job::new("regular", "regular", 1);
submitter.submit(regular_job).await.expect("submit regular");
let retry_job = Job::new("retry", "retry", 1).with_retry();
manager
.schedule_retry(retry_job, Some(RetryAfter::Seconds(0)))
.await;
let got1 = manager.get_next_task().await;
assert!(got1.is_some());
assert_eq!(&*got1.unwrap().prefix, "retry");
let got2 = manager.get_next_task().await;
assert!(got2.is_some());
assert_eq!(&*got2.unwrap().prefix, "regular");
shutdown_tx.send(true).expect("send shutdown");
}
#[tokio::test]
async fn test_task_manager_metrics_tracking() {
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
let config = TaskManagerConfig::default();
let (manager, submitter) = TaskManager::new(100, config, shutdown_rx);
submitter
.submit(Job::new("url1", "a", 1))
.await
.expect("submit");
submitter
.submit(Job::new("url2", "b", 1))
.await
.expect("submit");
let snapshot = manager.metrics().snapshot();
assert_eq!(snapshot.regular_queue_depth, 2);
let _ = manager.get_next_task().await;
let snapshot = manager.metrics().snapshot();
assert_eq!(snapshot.regular_queue_depth, 1);
let retry_job = Job::new("retry", "r", 1).with_retry();
manager.schedule_retry(retry_job, None).await;
let snapshot = manager.metrics().snapshot();
assert_eq!(snapshot.retry_queue_depth, 1);
assert_eq!(snapshot.total_retries, 1);
manager.record_success();
manager.record_failure();
let snapshot = manager.metrics().snapshot();
assert_eq!(snapshot.total_successes, 1);
assert_eq!(snapshot.total_failures, 1);
}
}