Skip to main content

simple_queue/
sync.rs

1use std::{any::Any, sync::Arc};
2
3use crate::job::Job;
4
5/// Type erased permit that should be returned by JobStrategy
6pub struct Permit {
7    _permit: std::sync::Arc<dyn Any + Send + Sync>,
8}
9impl Permit {
10    /// Creates a new Permit from any value that implements `Any + Send + Sync`
11    pub fn new<T: Any + Send + Sync>(value: T) -> Self {
12        Self {
13            _permit: Arc::new(value),
14        }
15    }
16}
17
18/// When aquiring a permit from a JobStrategy, permit giver can return
19/// an error to impact on the job's processing
20#[derive(Debug)]
21pub enum JobStrategyError {
22    /// The job shouldn't be processed, mark as canceled
23    /// (e.g. feature flags, resource no longer available, deprecated jobs etc.)
24    CancelJob,
25    /// The job should be tried again after a delay
26    /// (e.g. resource temporarily unavailable, renewable limits, etc. )
27    TryAfter(chrono::Duration),
28    /// Retry with backoff strategy (consumes attempt)
29    Retry,
30    /// Mark job as completed without processing,
31    /// e.g. when seasonal job backed off far into future is no longer relevant, but shouldn't
32    /// be sent to dead-letter queue
33    MarkCompleted,
34}
35
36/// Trait for job strategies that define how to acquire final, 3rd permit
37pub trait JobStrategy: Send + Sync {
38    fn acquire(&self, job: &Job)
39    -> crate::handler::BoxFuture<'_, Result<Permit, JobStrategyError>>;
40}
41pub struct InstantStrategy {}
42impl JobStrategy for InstantStrategy {
43    fn acquire(
44        &self,
45        _job: &Job,
46    ) -> crate::handler::BoxFuture<'_, Result<Permit, JobStrategyError>> {
47        Box::pin(async move { Ok(Permit::new(())) })
48    }
49}
50#[derive(Clone)]
51pub enum BackoffStrategy {
52    Linear {
53        delay: chrono::Duration,
54    },
55    Exponential {
56        factor: f64,
57        max_delay: chrono::Duration,
58    },
59    Custom(fn(i32) -> chrono::Duration),
60}
61impl Default for BackoffStrategy {
62    fn default() -> Self {
63        Self::Linear {
64            delay: chrono::Duration::seconds(5),
65        }
66    }
67}
68impl BackoffStrategy {
69    #[tracing::instrument(skip_all,fields(job_id=%job.id))]
70    pub fn next_attempt(&self, job: &Job) -> chrono::DateTime<chrono::Utc> {
71        match self {
72            BackoffStrategy::Linear { delay } => chrono::Utc::now() + *delay,
73            BackoffStrategy::Exponential { factor, max_delay } => {
74                let delay = chrono::Duration::seconds((job.attempt as f64).powf(*factor) as i64);
75                chrono::Utc::now() + delay.min(*max_delay)
76            }
77            BackoffStrategy::Custom(f) => chrono::Utc::now() + f(job.attempt),
78        }
79    }
80}