use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::OxanaError;
use crate::worker::Job;
pub type JobId = String;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct JobEnvelope {
pub id: JobId,
pub job: JobData,
pub queue: String,
pub meta: JobMeta,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct JobData {
pub name: String,
pub args: serde_json::Value,
}
fn default_resurrect() -> bool {
true
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct JobMeta {
pub id: JobId,
pub retries: u32,
pub unique: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub on_conflict: Option<JobConflictStrategy>,
pub created_at: i64,
#[serde(default)]
pub scheduled_at: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state: Option<serde_json::Value>,
#[serde(default = "default_resurrect")]
pub resurrect: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub throttle_cost: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum JobConflictStrategy {
#[default]
Skip,
Replace,
}
impl JobEnvelope {
pub(crate) fn new<T: Job>(queue: String, job: T) -> Result<Self, OxanaError> {
let job_name = T::worker_name().to_string();
let unique_id = job.unique_id();
let unique = unique_id.is_some();
let resurrect = T::should_resurrect();
let id = match unique_id {
Some(id) => format!("{}/{}", job_name, id),
None => Uuid::new_v4().to_string(),
};
Ok(Self {
id: id.clone(),
queue,
job: JobData {
name: job_name,
args: serde_json::to_value(&job)?,
},
meta: JobMeta {
id,
retries: 0,
unique,
on_conflict: if unique {
Some(job.on_conflict())
} else {
None
},
created_at: chrono::Utc::now().timestamp_micros(),
scheduled_at: chrono::Utc::now().timestamp_micros(),
started_at: None,
state: None,
resurrect,
error: None,
throttle_cost: job.throttle_cost(),
},
})
}
pub(crate) fn new_scheduled<T: Job>(
queue: String,
job: T,
scheduled_at: DateTime<Utc>,
) -> Result<Self, OxanaError> {
Ok(Self::new(queue, job)?.with_scheduled_at(scheduled_at))
}
pub(crate) fn new_cron(
queue: String,
id: String,
name: String,
scheduled_at: i64,
resurrect: bool,
) -> Result<Self, OxanaError> {
let now = chrono::Utc::now().timestamp_micros();
Ok(Self {
id: id.clone(),
queue,
job: JobData {
name,
args: serde_json::json!({}),
},
meta: JobMeta {
id,
retries: 0,
unique: true,
on_conflict: Some(JobConflictStrategy::Skip),
created_at: now,
scheduled_at: scheduled_at.max(now),
started_at: None,
state: None,
resurrect,
error: None,
throttle_cost: None,
},
})
}
pub(crate) fn with_scheduled_at(mut self, scheduled_at: DateTime<Utc>) -> Self {
let now = chrono::Utc::now().timestamp_micros();
self.meta.scheduled_at = scheduled_at.timestamp_micros().max(now);
self
}
pub(crate) fn with_error(mut self, error: String) -> Self {
self.meta.error = Some(error);
self
}
pub(crate) fn with_retries_incremented(self, error: String) -> Self {
Self {
id: self.id.clone(),
queue: self.queue,
job: self.job,
meta: JobMeta {
id: self.id,
retries: self.meta.retries + 1,
unique: self.meta.unique,
on_conflict: self.meta.on_conflict,
created_at: self.meta.created_at,
scheduled_at: self.meta.scheduled_at,
started_at: None,
state: self.meta.state,
resurrect: self.meta.resurrect,
error: Some(error),
throttle_cost: self.meta.throttle_cost,
},
}
}
}
impl JobMeta {
pub fn created_at_secs(&self) -> i64 {
self.created_at / 1000000
}
pub fn created_at_millis(&self) -> i64 {
self.created_at / 1000
}
pub fn scheduled_at_millis(&self) -> i64 {
self.scheduled_at / 1000
}
pub fn scheduled_at_secs(&self) -> i64 {
self.scheduled_at / 1000000
}
pub fn effective_scheduled_at_micros(&self) -> i64 {
if self.scheduled_at > 0 {
self.scheduled_at
} else {
self.created_at
}
}
pub fn latency_micros(&self) -> i64 {
let reference = self
.started_at
.unwrap_or_else(|| chrono::Utc::now().timestamp_micros());
(reference - self.effective_scheduled_at_micros()).max(0)
}
pub fn latency_secs(&self) -> i64 {
self.latency_micros() / 1000000
}
pub fn latency_millis(&self) -> i64 {
self.latency_micros() / 1000
}
pub fn scheduled_at(&self) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp_micros(self.scheduled_at).unwrap_or_else(Utc::now)
}
pub fn created_at(&self) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp_micros(self.created_at).unwrap_or_else(Utc::now)
}
pub fn started_at(&self) -> Option<DateTime<Utc>> {
self.started_at
.and_then(DateTime::<Utc>::from_timestamp_micros)
}
pub fn started_at_secs(&self) -> Option<i64> {
self.started_at.map(|t| t / 1000000)
}
pub fn started_at_millis(&self) -> Option<i64> {
self.started_at.map(|t| t / 1000)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_meta(scheduled_at: i64, started_at: Option<i64>) -> JobMeta {
JobMeta {
id: "test".to_string(),
retries: 0,
unique: false,
on_conflict: None,
created_at: scheduled_at,
scheduled_at,
started_at,
state: None,
resurrect: true,
error: None,
throttle_cost: None,
}
}
#[test]
fn test_started_at_none() {
let meta = make_meta(1_000_000, None);
assert!(meta.started_at().is_none());
assert!(meta.started_at_secs().is_none());
assert!(meta.started_at_millis().is_none());
}
#[test]
fn test_started_at_some() {
let scheduled = 1_700_000_000_000_000i64;
let started = scheduled + 5_500_000;
let meta = make_meta(scheduled, Some(started));
assert!(meta.started_at().is_some());
assert_eq!(meta.started_at_secs(), Some(started / 1_000_000));
assert_eq!(meta.started_at_millis(), Some(started / 1_000));
}
#[test]
fn test_latency_uses_started_at_when_available() {
let scheduled = 1_700_000_000_000_000i64;
let started = scheduled + 5_500_000;
let meta = make_meta(scheduled, Some(started));
assert_eq!(meta.latency_micros(), 5_500_000);
assert_eq!(meta.latency_millis(), 5_500);
assert_eq!(meta.latency_secs(), 5);
}
#[test]
fn test_latency_falls_back_to_now_without_started_at() {
let scheduled = chrono::Utc::now().timestamp_micros() - 2_000_000;
let meta = make_meta(scheduled, None);
let latency = meta.latency_micros();
assert!(latency >= 2_000_000);
assert!(latency < 3_000_000);
}
#[test]
fn test_latency_clamped_to_zero() {
let scheduled = 1_700_000_000_000_000i64;
let started = scheduled - 100;
let meta = make_meta(scheduled, Some(started));
assert_eq!(meta.latency_micros(), 0);
}
#[test]
fn test_with_retries_incremented_resets_started_at() {
let envelope = JobEnvelope {
id: "test".to_string(),
queue: "default".to_string(),
job: JobData {
name: "TestJob".to_string(),
args: serde_json::json!({}),
},
meta: JobMeta {
id: "test".to_string(),
retries: 0,
unique: false,
on_conflict: None,
created_at: 1_000_000,
scheduled_at: 1_000_000,
started_at: Some(2_000_000),
state: None,
resurrect: true,
error: None,
throttle_cost: None,
},
};
let retried = envelope.with_retries_incremented("something went wrong".to_string());
assert_eq!(retried.meta.retries, 1);
assert!(retried.meta.started_at.is_none());
assert_eq!(retried.meta.error.as_deref(), Some("something went wrong"));
}
#[test]
fn test_cannot_schedule_in_past() {
let envelope = JobEnvelope {
id: "test".to_string(),
queue: "default".to_string(),
job: JobData {
name: "TestJob".to_string(),
args: serde_json::json!({}),
},
meta: JobMeta {
id: "test".to_string(),
retries: 0,
unique: false,
on_conflict: None,
created_at: 1_000_000,
scheduled_at: 1_000_000,
started_at: None,
state: None,
resurrect: true,
error: None,
throttle_cost: None,
},
};
let past = Utc::now() - chrono::Duration::hours(1);
let envelope = envelope.with_scheduled_at(past);
let drift_micros = Utc::now().timestamp_micros() - envelope.meta.scheduled_at;
assert!(drift_micros >= 0);
assert!(drift_micros < 1_000_000);
}
#[test]
fn test_cannot_schedule_cron_in_past() {
let past = Utc::now().timestamp_micros() - 3_600_000_000;
let envelope = JobEnvelope::new_cron(
"default".to_string(),
"cron-job-1".to_string(),
"CronJob".to_string(),
past,
true,
)
.unwrap();
let drift_micros = Utc::now().timestamp_micros() - envelope.meta.scheduled_at;
assert!(drift_micros >= 0);
assert!(drift_micros < 1_000_000);
}
#[test]
fn test_serde_backward_compatibility() {
let json = r#"{
"id": "test",
"retries": 0,
"unique": false,
"on_conflict": null,
"created_at": 1000000,
"scheduled_at": 1000000,
"state": null,
"resurrect": true
}"#;
let meta: JobMeta =
serde_json::from_str(json).expect("should deserialize without started_at");
assert!(meta.started_at.is_none());
assert!(meta.error.is_none());
}
}