use anyhow::anyhow;
use async_trait::async_trait;
use dyn_clone::DynClone;
use redis::ToRedisArgs;
use redis::{from_redis_value, FromRedisValue};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::hash::Hash;
use std::str::FromStr;
use std::time::Duration;
use thiserror::Error;
use time::OffsetDateTime;
use uuid::Uuid;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize, Hash)]
pub enum Backoff {
Linear(Duration),
Exponential(Duration),
Default,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize, Hash)]
pub enum RetryPolicy {
Forever { backoff: Backoff },
Count { backoff: Backoff, count: usize },
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize, Hash)]
pub enum Queue {
Scheduled,
Enqueued,
Failed,
Working,
Retrying,
Processed,
Dead,
}
impl Queue {
pub fn namespaced(self, name: &str) -> String {
format!("{}:{}", self, name)
}
}
impl Display for Queue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s: &'static str = match self {
Queue::Scheduled => "scheduled",
Queue::Enqueued => "enqueued",
Queue::Failed => "failed",
Queue::Working => "working",
Queue::Retrying => "retrying",
Queue::Processed => "processed",
Queue::Dead => "dead",
};
f.write_str(s)
}
}
impl FromStr for Queue {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let x: String = s.split(':').take(1).collect();
match x.as_str() {
"scheduled" => Ok(Queue::Scheduled),
"enqueued" => Ok(Queue::Enqueued),
"failed" => Ok(Queue::Failed),
"working" => Ok(Queue::Working),
"retrying" => Ok(Queue::Retrying),
"processed" => Ok(Queue::Processed),
"dead" => Ok(Queue::Dead),
_ => Err(anyhow!("NO!")),
}
}
}
#[derive(Error, Debug)]
pub enum JobError {
#[error("A panic occurred in the job: {}", .panic)]
PanicError { panic: String },
#[error(transparent)]
AnyError(#[from] anyhow::Error),
}
#[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct JobId(Uuid);
impl Display for JobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
pub struct JobDescription {
pub version: usize,
pub job_id: JobId,
pub job_type: String,
pub namespace: String,
pub input: Option<JobInput>,
pub uniqueness_key: Option<String>,
pub uniqueness_policy: Option<UniqunessPolicy>,
pub retry_policy: Option<RetryPolicy>,
pub retry: Option<usize>,
pub backoff: Option<Duration>,
pub at: Option<time::OffsetDateTime>,
pub period: Option<String>,
pub created_at: OffsetDateTime,
pub enqueued_at: Option<OffsetDateTime>,
pub error_message: Option<String>,
}
impl JobDescription {
pub(crate) fn enqueue(&self) -> JobDescription {
JobDescription {
job_id: JobId(Uuid::new_v4()),
at: None,
..self.clone()
}
}
}
impl FromRedisValue for JobDescription {
fn from_redis_value(v: &redis::Value) -> redis::RedisResult<Self> {
let v: String = from_redis_value(v)?;
let parsed = serde_json::from_str(&v).map_err(std::io::Error::from)?;
Ok(parsed)
}
}
impl ToRedisArgs for JobDescription {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
let s = serde_json::to_string(self).expect("INVARIANT VIOLATED: Failed to serialize job");
s.write_redis_args(out)
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub enum JobInput {
Plaintext(String),
EncryptedInput {
nonce: String,
ciphertext: String,
},
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub enum UniqunessPolicy {
Duration(Duration),
}
#[async_trait]
pub trait Job: Send + Sync + 'static + DynClone + std::fmt::Debug {
fn name(&self) -> &'static str;
async fn perform(&self, _: Option<String>) -> Result<Option<String>, anyhow::Error>;
fn queue(&self) -> &'static str {
"default"
}
fn uniqueness_key(&self, input: Option<&JobInput>) -> String {
let mut hasher = blake3::Hasher::new();
hasher.update(self.queue().as_bytes());
hasher.update(self.name().as_bytes());
hasher.update(serde_json::to_string(&input).unwrap().as_bytes());
hasher.finalize().to_string()
}
fn uniqueness_policy(&self) -> Option<UniqunessPolicy> {
None
}
fn retry_policy(&self) -> Option<RetryPolicy> {
Some(RetryPolicy::Count {
backoff: Backoff::Default,
count: 25,
})
}
fn expiry(&self) -> Option<Duration> {
None
}
fn to_job_description(&self, input: Option<JobInput>) -> JobDescription {
let now = time::OffsetDateTime::now_utc();
let uniqueness_policy = self.uniqueness_policy();
JobDescription {
version: 1,
job_id: JobId(Uuid::new_v4()),
job_type: self.name().to_string(),
retry: None,
retry_policy: self.retry_policy(),
backoff: None,
uniqueness_key: uniqueness_policy
.as_ref()
.map(|_| self.uniqueness_key(input.as_ref())),
uniqueness_policy,
input,
namespace: self.queue().to_string(),
at: None,
period: None,
created_at: now,
enqueued_at: None,
error_message: None,
}
}
}
#[async_trait]
impl Job for Box<dyn Job> {
fn name(&self) -> &'static str {
self.as_ref().name()
}
async fn perform(&self, arg: Option<String>) -> Result<Option<String>, anyhow::Error> {
self.as_ref().perform(arg).await
}
}
dyn_clone::clone_trait_object!(Job);