use crate::Error;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[async_trait]
pub trait Job: Send + Sync + 'static {
async fn handle(&self) -> Result<(), Error>;
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}
fn max_retries(&self) -> u32 {
3
}
fn retry_delay(&self, _attempt: u32) -> std::time::Duration {
std::time::Duration::from_secs(5)
}
async fn failed(&self, error: &Error) {
tracing::error!(job = self.name(), error = %error, "Job failed permanently");
}
fn timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(60)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobPayload {
pub id: Uuid,
pub job_type: String,
pub data: String,
pub queue: String,
pub attempts: u32,
pub max_retries: u32,
pub created_at: DateTime<Utc>,
pub available_at: DateTime<Utc>,
pub reserved_at: Option<DateTime<Utc>>,
#[serde(default)]
pub tenant_id: Option<i64>,
}
impl JobPayload {
pub fn new<J: Job + Serialize>(job: &J, queue: &str) -> Result<Self, Error> {
let data =
serde_json::to_string(job).map_err(|e| Error::SerializationFailed(e.to_string()))?;
Ok(Self {
id: Uuid::new_v4(),
job_type: job.name().to_string(),
data,
queue: queue.to_string(),
attempts: 0,
max_retries: job.max_retries(),
created_at: Utc::now(),
available_at: Utc::now(),
reserved_at: None,
tenant_id: None,
})
}
pub fn with_delay<J: Job + Serialize>(
job: &J,
queue: &str,
delay: std::time::Duration,
) -> Result<Self, Error> {
let mut payload = Self::new(job, queue)?;
payload.available_at = Utc::now() + chrono::Duration::from_std(delay).unwrap_or_default();
Ok(payload)
}
pub fn with_tenant_id(mut self, id: Option<i64>) -> Self {
self.tenant_id = id;
self
}
pub fn is_available(&self) -> bool {
Utc::now() >= self.available_at
}
pub fn has_exceeded_retries(&self) -> bool {
self.attempts >= self.max_retries
}
pub fn increment_attempts(&mut self) {
self.attempts += 1;
}
pub fn reserve(&mut self) {
self.reserved_at = Some(Utc::now());
}
pub fn to_json(&self) -> Result<String, Error> {
serde_json::to_string(self).map_err(|e| Error::SerializationFailed(e.to_string()))
}
pub fn from_json(json: &str) -> Result<Self, Error> {
serde_json::from_str(json).map_err(|e| Error::DeserializationFailed(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TestJob {
value: i32,
}
#[async_trait]
impl Job for TestJob {
async fn handle(&self) -> Result<(), Error> {
Ok(())
}
}
#[test]
fn test_job_payload_creation() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default").unwrap();
assert_eq!(payload.queue, "default");
assert_eq!(payload.attempts, 0);
assert!(payload.is_available());
}
#[test]
fn test_job_payload_with_delay() {
let job = TestJob { value: 42 };
let payload =
JobPayload::with_delay(&job, "default", std::time::Duration::from_secs(60)).unwrap();
assert!(!payload.is_available());
}
#[test]
fn test_job_payload_serialization() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default").unwrap();
let json = payload.to_json().unwrap();
let restored = JobPayload::from_json(&json).unwrap();
assert_eq!(payload.id, restored.id);
assert_eq!(payload.queue, restored.queue);
}
#[test]
fn test_tenant_id_none_by_default() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default").unwrap();
assert_eq!(payload.tenant_id, None);
}
#[test]
fn test_tenant_id_none_serializes_as_null() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default").unwrap();
let json = payload.to_json().unwrap();
let val: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(val["tenant_id"], serde_json::Value::Null);
}
#[test]
fn test_tenant_id_some_round_trips() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default")
.unwrap()
.with_tenant_id(Some(42));
let json = payload.to_json().unwrap();
let restored = JobPayload::from_json(&json).unwrap();
assert_eq!(restored.tenant_id, Some(42));
}
#[test]
fn test_old_payload_without_tenant_id_deserializes_to_none() {
let old_json = r#"{"id":"550e8400-e29b-41d4-a716-446655440000","job_type":"test","data":"{}","queue":"default","attempts":0,"max_retries":3,"created_at":"2024-01-01T00:00:00Z","available_at":"2024-01-01T00:00:00Z","reserved_at":null}"#;
let payload = JobPayload::from_json(old_json).unwrap();
assert_eq!(payload.tenant_id, None);
}
#[test]
fn test_with_tenant_id_builder() {
let job = TestJob { value: 42 };
let payload = JobPayload::new(&job, "default")
.unwrap()
.with_tenant_id(Some(99));
assert_eq!(payload.tenant_id, Some(99));
let payload_none = JobPayload::new(&job, "default")
.unwrap()
.with_tenant_id(None);
assert_eq!(payload_none.tenant_id, None);
}
}