use crate::stores::Store;
#[cfg(feature = "redis-store")]
use crate::utils::to_redis_parsing_error;
use chrono::{DateTime, Utc};
use derive_more::{Display, FromStr};
#[cfg(feature = "redis-store")]
use redis::ToRedisArgs;
#[cfg(feature = "redis-store")]
use redis::{FromRedisValue, ParsingError, ToSingleRedisArg, Value};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
mod backoff;
mod delay;
mod repeat;
use crate::{job::delay::JobDelay, KioError};
pub use backoff::{BackOff, BackOffJobOptions, BackOffOptions, StoredFn};
pub use repeat::Repeat;
use std::time::Duration;
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, Display, Default)]
#[display("job {id}-#{attempt} , ran for {ran_for:?}, delayed for {delayed_for:?}")]
pub struct JobMetrics {
pub ran_for: Duration,
pub delayed_for: Duration,
pub attempt: u64,
pub delay: u64,
pub id: u64,
}
pub type Dt = DateTime<Utc>;
#[derive(
Debug,
Serialize,
Deserialize,
FromStr,
Default,
Hash,
Ord,
PartialOrd,
Display,
Clone,
Copy,
PartialEq,
Eq,
)]
#[serde(rename_all = "camelCase")]
pub enum JobState {
#[default]
Wait,
Prioritized,
Stalled,
Active,
Paused,
Resumed,
Completed,
Failed,
Delayed,
Progress,
Obliterated,
Processing,
}
#[cfg(feature = "redis-store")]
impl ToRedisArgs for JobState {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
out.write_arg_fmt(self.to_string().to_lowercase());
}
}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for JobState {}
#[derive(Debug, Serialize, Deserialize, Default, Hash, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct JobOptions {
pub priority: u64,
pub delay: JobDelay,
pub id: Option<u64>,
pub attempts: u64,
pub remove_on_complete: Option<RemoveOnCompletionOrFailure>,
pub remove_on_fail: Option<RemoveOnCompletionOrFailure>,
pub backoff: Option<BackOffJobOptions>,
pub repeat: Option<Repeat>,
}
#[derive(Debug, Deserialize, Serialize, Clone, Copy, Hash, PartialEq, Eq)]
#[serde(untagged)]
pub enum RemoveOnCompletionOrFailure {
Bool(bool),
Int(i64),
Opts(KeepJobs),
}
impl Default for RemoveOnCompletionOrFailure {
fn default() -> Self {
Self::Bool(false)
}
}
#[derive(Debug, Default, Deserialize, Serialize, Clone, Copy, Hash, PartialEq, Eq)]
pub struct KeepJobs {
pub age: Option<i64>,
pub count: Option<i64>,
}
#[derive(Debug, Default, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)]
pub struct Trace {
pub run: u64,
pub reason: String,
pub frames: Vec<String>,
}
#[derive(Debug, Default, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)]
pub struct FailedDetails {
pub run: u64,
pub reason: String,
}
use chrono::serde::{ts_microseconds, ts_microseconds_option};
use derive_more::Debug;
#[derive(Debug, Serialize, Deserialize, Default, Hash, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Job<D, R, P> {
pub id: Option<u64>,
#[serde(rename = "timestamp", alias = "timestamp")]
#[serde(with = "ts_microseconds")]
pub ts: Dt,
pub name: String,
pub state: JobState,
#[debug(skip)]
pub progress: Option<P>,
pub attempts_made: u64,
pub opts: JobOptions,
pub delay: u64,
#[debug(skip)]
pub data: Option<D>,
#[debug(skip)]
pub returned_value: Option<R>,
pub stack_trace: Vec<Trace>,
pub failed_reason: Option<FailedDetails>,
#[serde(with = "ts_microseconds_option")]
pub processed_on: Option<Dt>,
#[serde(with = "ts_microseconds_option")]
pub finished_on: Option<Dt>,
pub queue_name: Option<String>,
pub token: Option<JobToken>,
pub stalled_counter: u64,
pub logs: Vec<String>,
pub priority: u64,
}
#[cfg(feature = "redis-store")]
impl FromRedisValue for JobState {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let mut bytes: Vec<u8> = Vec::from_redis_value(v)?;
let state = Self::from_str(&String::from_utf8(bytes.clone())?)
.or_else(|_| simd_json::from_slice(&mut bytes))
.map_err(to_redis_parsing_error)?;
Ok(state)
}
}
use uuid::Uuid;
#[derive(
Debug,
derive_more::Display,
Clone,
Copy,
Hash,
PartialEq,
Eq,
PartialOrd,
Ord,
Serialize,
Deserialize,
)]
#[display("{_0}-{_1}-{_2}")]
pub struct JobToken(pub Uuid, pub Uuid, pub u64);
#[cfg(feature = "redis-store")]
impl FromRedisValue for JobToken {
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let mut bytes: Vec<u8> = Vec::from_redis_value(v)?;
if bytes == b"null" {
return Err(ParsingError::from("null passed"));
}
let token = simd_json::from_slice(&mut bytes).map_err(to_redis_parsing_error)?;
Ok(token)
}
}
impl Default for JobToken {
fn default() -> Self {
Self(Uuid::new_v4(), Uuid::new_v4(), Default::default())
}
}
#[cfg(feature = "redis-store")]
impl ToRedisArgs for JobToken {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + redis::RedisWrite,
{
out.write_arg_fmt(simd_json::to_string(self).unwrap_or_default());
}
}
#[cfg(feature = "redis-store")]
impl ToSingleRedisArg for JobToken {}
impl<D, R, P> Job<D, R, P> {
pub fn boxed(self) -> Box<Self> {
Box::new(self)
}
pub fn new(name: &str, data: Option<D>, id: Option<u64>, queue_name: Option<&str>) -> Self {
let ts = Utc::now();
Self {
opts: JobOptions::default(),
queue_name: queue_name.map(std::borrow::ToOwned::to_owned),
name: name.to_owned(),
id,
ts,
data,
returned_value: None,
progress: None,
processed_on: None,
finished_on: None,
state: JobState::default(),
delay: 0,
attempts_made: 0,
stack_trace: vec![],
failed_reason: None,
token: None,
stalled_counter: 0,
logs: Vec::new(),
priority: 0,
}
}
pub fn get_metrics(&self) -> Option<JobMetrics> {
let delay = self.opts.delay.as_diff_ms(self.ts).cast_unsigned();
let processed_on = self.processed_on.unwrap_or_default();
let id = self.id.unwrap_or_default();
let finished_on = self.finished_on.unwrap_or_default();
let attempt = self.attempts_made;
let ran_for = (finished_on - processed_on).to_std().unwrap_or_default();
let delayed_for = (processed_on - self.ts).to_std().unwrap_or_default();
Some(JobMetrics {
ran_for,
delayed_for,
attempt,
delay,
id,
})
}
pub fn add_opts(&mut self, opts: JobOptions) {
self.priority = opts.priority;
self.delay = opts.delay.as_diff_ms(self.ts).cast_unsigned();
self.opts = opts;
}
pub fn update_progress_sync<C>(&mut self, value: P, store: &C) -> Result<(), KioError>
where
P: Serialize + Clone,
C: Store<D, R, P>,
{
store.update_job_progress_sync(self, value)
}
#[allow(clippy::future_not_send)]
pub async fn update_progress<C>(&mut self, value: P, store: &C) -> Result<(), KioError>
where
P: Serialize + Clone,
C: Store<D, R, P>,
{
store.update_job_progress(self, value).await
}
}
#[cfg(feature = "redis-store")]
impl<D, R, P> FromRedisValue for Job<D, R, P>
where
D: for<'de> Deserialize<'de>, R: for<'de> Deserialize<'de>, P: for<'de> Deserialize<'de>,
{
fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
let other = to_redis_parsing_error;
let mut job: Self = Self::new("", None, None, None);
let map = v
.as_map_iter()
.ok_or_else(|| ParsingError::from("failed to extract map"))?;
for (key, value) in map {
if let (Value::BulkString(key), Value::BulkString(bytes)) = (key, value) {
let mut bytes = bytes.clone();
match key.as_slice() {
b"id" => job.id = simd_json::from_slice(&mut bytes).map_err(other)?,
b"timestamp" => {
job.ts = simd_json::from_slice::<Option<u64>>(&mut bytes)
.map_err(other)?
.and_then(|t| Dt::from_timestamp_micros(t.cast_signed()))
.unwrap_or_default();
}
b"opts" => job.opts = simd_json::from_slice(&mut bytes).map_err(other)?,
b"name" => job.name = simd_json::from_slice(&mut bytes).map_err(other)?,
b"queuename" | b"queueName" => {
job.queue_name = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"state" => job.state = JobState::from_redis_value_ref(value)?,
b"token" => {
job.token = simd_json::from_slice(&mut bytes).unwrap_or_default();
}
b"progress" => {
job.progress = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"attemptsmade" | b"attemptsMade" => {
job.attempts_made = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"delay" => job.delay = simd_json::from_slice(&mut bytes).map_err(other)?,
b"priority" => {
job.priority = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"data" => job.data = simd_json::from_slice(&mut bytes).map_err(other)?,
b"returnedvalue" | b"returnedValue" => {
job.returned_value = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"stacktrace" | b"stackTrace" => {
job.stack_trace = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"logs" => job.logs = simd_json::from_slice(&mut bytes).map_err(other)?,
b"failedreason" | b"failedReason" => {
job.failed_reason = simd_json::from_slice(&mut bytes).map_err(other)?;
}
b"processedon" | b"processedOn" => {
job.processed_on = simd_json::from_slice::<Option<u64>>(&mut bytes)
.map_err(other)?
.and_then(|t| Dt::from_timestamp_micros(t.cast_signed()));
} b"finishedon" | b"finishedOn" => {
job.finished_on = simd_json::from_slice::<Option<u64>>(&mut bytes)
.map_err(other)?
.and_then(|t| Dt::from_timestamp_micros(t.cast_signed()));
}
b"stalledcounter" | b"stalledCounter" => {
job.stalled_counter = simd_json::from_slice(&mut bytes).map_err(other)?;
}
_ => { }
}
}
}
Ok(job)
}
}