use crate::cron::CronSchedule;
use crate::priority::JobPriority;
use crate::retry::RetryStrategy;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[cfg(any(feature = "postgres", feature = "mysql"))]
use sqlx::{Decode, Encode, Type};
#[cfg(feature = "postgres")]
use sqlx::Postgres;
#[cfg(feature = "mysql")]
use sqlx::MySql;
pub type JobId = Uuid;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum JobStatus {
Pending,
Running,
Completed,
Failed,
Dead,
TimedOut,
Retrying,
Archived,
}
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Pending => "Pending",
JobStatus::Running => "Running",
JobStatus::Completed => "Completed",
JobStatus::Failed => "Failed",
JobStatus::Dead => "Dead",
JobStatus::TimedOut => "TimedOut",
JobStatus::Retrying => "Retrying",
JobStatus::Archived => "Archived",
}
}
}
#[cfg(feature = "postgres")]
impl Type<Postgres> for JobStatus {
fn type_info() -> sqlx::postgres::PgTypeInfo {
<String as Type<Postgres>>::type_info()
}
}
#[cfg(feature = "postgres")]
impl Encode<'_, Postgres> for JobStatus {
fn encode_by_ref(
&self,
buf: &mut sqlx::postgres::PgArgumentBuffer,
) -> Result<sqlx::encode::IsNull, Box<dyn std::error::Error + Send + Sync + 'static>> {
let status_str = match self {
JobStatus::Pending => "Pending",
JobStatus::Running => "Running",
JobStatus::Completed => "Completed",
JobStatus::Failed => "Failed",
JobStatus::Dead => "Dead",
JobStatus::TimedOut => "TimedOut",
JobStatus::Retrying => "Retrying",
JobStatus::Archived => "Archived",
};
<&str as Encode<'_, Postgres>>::encode_by_ref(&status_str, buf)
}
}
#[cfg(feature = "postgres")]
impl Decode<'_, Postgres> for JobStatus {
fn decode(value: sqlx::postgres::PgValueRef<'_>) -> Result<Self, sqlx::error::BoxDynError> {
let status_str = <String as Decode<Postgres>>::decode(value)?;
let cleaned_str = status_str.trim_matches('"');
match cleaned_str {
"Pending" => Ok(JobStatus::Pending),
"Running" => Ok(JobStatus::Running),
"Completed" => Ok(JobStatus::Completed),
"Failed" => Ok(JobStatus::Failed),
"Dead" => Ok(JobStatus::Dead),
"TimedOut" => Ok(JobStatus::TimedOut),
"Retrying" => Ok(JobStatus::Retrying),
"Archived" => Ok(JobStatus::Archived),
_ => Err(format!("Unknown job status: {}", status_str).into()),
}
}
}
#[cfg(feature = "mysql")]
impl Type<MySql> for JobStatus {
fn type_info() -> sqlx::mysql::MySqlTypeInfo {
<String as Type<MySql>>::type_info()
}
}
#[cfg(feature = "mysql")]
impl Encode<'_, MySql> for JobStatus {
fn encode_by_ref(
&self,
buf: &mut Vec<u8>,
) -> Result<sqlx::encode::IsNull, Box<dyn std::error::Error + Send + Sync + 'static>> {
let status_str = match self {
JobStatus::Pending => "Pending",
JobStatus::Running => "Running",
JobStatus::Completed => "Completed",
JobStatus::Failed => "Failed",
JobStatus::Dead => "Dead",
JobStatus::TimedOut => "TimedOut",
JobStatus::Retrying => "Retrying",
JobStatus::Archived => "Archived",
};
<&str as Encode<'_, MySql>>::encode_by_ref(&status_str, buf)
}
}
#[cfg(feature = "mysql")]
impl Decode<'_, MySql> for JobStatus {
fn decode(value: sqlx::mysql::MySqlValueRef<'_>) -> Result<Self, sqlx::error::BoxDynError> {
let status_str = <String as Decode<MySql>>::decode(value)?;
let cleaned_str = status_str.trim_matches('"');
match cleaned_str {
"Pending" => Ok(JobStatus::Pending),
"Running" => Ok(JobStatus::Running),
"Completed" => Ok(JobStatus::Completed),
"Failed" => Ok(JobStatus::Failed),
"Dead" => Ok(JobStatus::Dead),
"TimedOut" => Ok(JobStatus::TimedOut),
"Retrying" => Ok(JobStatus::Retrying),
"Archived" => Ok(JobStatus::Archived),
_ => Err(format!("Unknown job status: {}", status_str).into()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ResultStorage {
Database,
Memory,
None,
}
impl Default for ResultStorage {
fn default() -> Self {
Self::None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResultConfig {
pub storage: ResultStorage,
pub ttl: Option<std::time::Duration>,
pub max_size_bytes: Option<usize>,
}
impl ResultConfig {
pub fn new(storage: ResultStorage) -> Self {
Self {
storage,
ttl: None,
max_size_bytes: None,
}
}
pub fn with_ttl(mut self, ttl: std::time::Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn with_max_size(mut self, max_bytes: usize) -> Self {
self.max_size_bytes = Some(max_bytes);
self
}
}
impl Default for ResultConfig {
fn default() -> Self {
Self::new(ResultStorage::None)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
pub id: JobId,
pub queue_name: String,
pub payload: serde_json::Value,
pub status: JobStatus,
pub attempts: i32,
pub max_attempts: i32,
pub created_at: DateTime<Utc>,
pub scheduled_at: DateTime<Utc>,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
pub failed_at: Option<DateTime<Utc>>,
pub timed_out_at: Option<DateTime<Utc>>,
pub timeout: Option<std::time::Duration>,
pub error_message: Option<String>,
pub priority: JobPriority,
pub cron_schedule: Option<String>,
pub next_run_at: Option<DateTime<Utc>>,
pub recurring: bool,
pub timezone: Option<String>,
pub batch_id: Option<crate::batch::BatchId>,
pub result_config: ResultConfig,
pub result_data: Option<serde_json::Value>,
pub result_stored_at: Option<DateTime<Utc>>,
pub result_expires_at: Option<DateTime<Utc>>,
pub retry_strategy: Option<RetryStrategy>,
pub depends_on: Vec<JobId>,
pub dependents: Vec<JobId>,
pub dependency_status: crate::workflow::DependencyStatus,
pub workflow_id: Option<crate::workflow::WorkflowId>,
pub workflow_name: Option<String>,
pub trace_id: Option<String>,
pub correlation_id: Option<String>,
pub parent_span_id: Option<String>,
pub span_context: Option<String>,
#[cfg(feature = "encryption")]
pub encryption_config: Option<crate::encryption::EncryptionConfig>,
pub pii_fields: Vec<String>,
#[cfg(feature = "encryption")]
pub retention_policy: Option<crate::encryption::RetentionPolicy>,
pub is_encrypted: bool,
#[cfg(feature = "encryption")]
pub encrypted_payload: Option<crate::encryption::EncryptedPayload>,
}
impl Job {
pub fn new(queue_name: String, payload: serde_json::Value) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
queue_name,
payload,
status: JobStatus::Pending,
attempts: 0,
max_attempts: 3,
created_at: now,
scheduled_at: now,
started_at: None,
completed_at: None,
failed_at: None,
timed_out_at: None,
timeout: None,
error_message: None,
priority: JobPriority::default(),
cron_schedule: None,
next_run_at: None,
recurring: false,
timezone: None,
batch_id: None,
result_config: ResultConfig::default(),
result_data: None,
result_stored_at: None,
result_expires_at: None,
retry_strategy: None,
depends_on: Vec::new(),
dependents: Vec::new(),
dependency_status: crate::workflow::DependencyStatus::None,
workflow_id: None,
workflow_name: None,
trace_id: None,
correlation_id: None,
parent_span_id: None,
span_context: None,
#[cfg(feature = "encryption")]
encryption_config: None,
pii_fields: Vec::new(),
#[cfg(feature = "encryption")]
retention_policy: None,
is_encrypted: false,
#[cfg(feature = "encryption")]
encrypted_payload: None,
}
}
pub fn with_delay(
queue_name: String,
payload: serde_json::Value,
delay: chrono::Duration,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4(),
queue_name,
payload,
status: JobStatus::Pending,
attempts: 0,
max_attempts: 3,
created_at: now,
scheduled_at: now + delay,
started_at: None,
completed_at: None,
failed_at: None,
timed_out_at: None,
timeout: None,
error_message: None,
priority: JobPriority::default(),
cron_schedule: None,
next_run_at: None,
recurring: false,
timezone: None,
batch_id: None,
result_config: ResultConfig::default(),
result_data: None,
result_stored_at: None,
result_expires_at: None,
retry_strategy: None,
depends_on: Vec::new(),
dependents: Vec::new(),
dependency_status: crate::workflow::DependencyStatus::None,
workflow_id: None,
workflow_name: None,
trace_id: None,
correlation_id: None,
parent_span_id: None,
span_context: None,
#[cfg(feature = "encryption")]
encryption_config: None,
pii_fields: Vec::new(),
#[cfg(feature = "encryption")]
retention_policy: None,
is_encrypted: false,
#[cfg(feature = "encryption")]
encrypted_payload: None,
}
}
pub fn with_max_attempts(mut self, max_attempts: i32) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_priority(mut self, priority: JobPriority) -> Self {
self.priority = priority;
self
}
pub fn as_critical(mut self) -> Self {
self.priority = JobPriority::Critical;
self
}
pub fn as_high_priority(mut self) -> Self {
self.priority = JobPriority::High;
self
}
pub fn as_low_priority(mut self) -> Self {
self.priority = JobPriority::Low;
self
}
pub fn as_background(mut self) -> Self {
self.priority = JobPriority::Background;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_strategy = Some(strategy);
self
}
pub fn with_exponential_backoff(
mut self,
base: std::time::Duration,
multiplier: f64,
max_delay: std::time::Duration,
) -> Self {
self.retry_strategy = Some(RetryStrategy::exponential(
base,
multiplier,
Some(max_delay),
));
self
}
pub fn with_linear_backoff(
mut self,
base: std::time::Duration,
increment: std::time::Duration,
max_delay: Option<std::time::Duration>,
) -> Self {
self.retry_strategy = Some(RetryStrategy::linear(base, increment, max_delay));
self
}
pub fn with_fibonacci_backoff(
mut self,
base: std::time::Duration,
max_delay: Option<std::time::Duration>,
) -> Self {
self.retry_strategy = Some(RetryStrategy::fibonacci(base, max_delay));
self
}
pub fn with_cron_schedule(
queue_name: String,
payload: serde_json::Value,
cron_schedule: CronSchedule,
) -> Result<Self, crate::cron::CronError> {
let now = Utc::now();
let next_run = cron_schedule.next_execution_from_now();
Ok(Self {
id: Uuid::new_v4(),
queue_name,
payload,
status: JobStatus::Pending,
attempts: 0,
max_attempts: 3,
created_at: now,
scheduled_at: next_run.unwrap_or(now),
started_at: None,
completed_at: None,
failed_at: None,
timed_out_at: None,
timeout: None,
error_message: None,
priority: JobPriority::default(),
cron_schedule: Some(cron_schedule.expression.clone()),
next_run_at: next_run,
recurring: true,
timezone: Some(cron_schedule.timezone.clone()),
batch_id: None,
result_config: ResultConfig::default(),
result_data: None,
result_stored_at: None,
result_expires_at: None,
retry_strategy: None,
depends_on: Vec::new(),
dependents: Vec::new(),
dependency_status: crate::workflow::DependencyStatus::None,
workflow_id: None,
workflow_name: None,
trace_id: None,
correlation_id: None,
parent_span_id: None,
span_context: None,
#[cfg(feature = "encryption")]
encryption_config: None,
pii_fields: Vec::new(),
#[cfg(feature = "encryption")]
retention_policy: None,
is_encrypted: false,
#[cfg(feature = "encryption")]
encrypted_payload: None,
})
}
pub fn with_cron(
mut self,
cron_schedule: CronSchedule,
) -> Result<Self, crate::cron::CronError> {
let next_run = cron_schedule.next_execution_from_now();
self.cron_schedule = Some(cron_schedule.expression.clone());
self.next_run_at = next_run;
self.recurring = true;
self.timezone = Some(cron_schedule.timezone.clone());
self.scheduled_at = next_run.unwrap_or(self.scheduled_at);
Ok(self)
}
pub fn as_recurring(mut self) -> Self {
self.recurring = true;
self
}
pub fn with_timezone(mut self, timezone: String) -> Self {
self.timezone = Some(timezone);
self
}
pub fn with_result_storage(mut self, storage: ResultStorage) -> Self {
self.result_config.storage = storage;
self
}
pub fn with_result_ttl(mut self, ttl: std::time::Duration) -> Self {
self.result_config.ttl = Some(ttl);
self
}
pub fn with_result_config(mut self, config: ResultConfig) -> Self {
self.result_config = config;
self
}
pub fn has_result_storage(&self) -> bool {
self.result_config.storage != ResultStorage::None
}
pub fn has_result_data(&self) -> bool {
self.result_data.is_some()
}
pub fn is_dead(&self) -> bool {
self.status == JobStatus::Dead
}
pub fn is_timed_out(&self) -> bool {
self.status == JobStatus::TimedOut
}
pub fn is_critical(&self) -> bool {
self.priority == JobPriority::Critical
}
pub fn is_high_priority(&self) -> bool {
self.priority == JobPriority::High
}
pub fn is_normal_priority(&self) -> bool {
self.priority == JobPriority::Normal
}
pub fn is_low_priority(&self) -> bool {
self.priority == JobPriority::Low
}
pub fn is_background(&self) -> bool {
self.priority == JobPriority::Background
}
pub fn priority_value(&self) -> i32 {
self.priority.as_i32()
}
pub fn has_exhausted_retries(&self) -> bool {
self.attempts >= self.max_attempts
}
pub fn should_timeout(&self) -> bool {
if let (Some(started_at), Some(timeout)) = (self.started_at, self.timeout) {
let elapsed = Utc::now() - started_at;
let timeout_duration = chrono::Duration::from_std(timeout).unwrap_or_default();
elapsed >= timeout_duration
} else {
false
}
}
pub fn age(&self) -> chrono::Duration {
Utc::now() - self.created_at
}
pub fn processing_duration(&self) -> Option<chrono::Duration> {
self.started_at.map(|started| {
self.completed_at
.or(self.failed_at)
.or(self.timed_out_at)
.unwrap_or_else(Utc::now)
- started
})
}
pub fn is_recurring(&self) -> bool {
self.recurring
}
pub fn has_cron_schedule(&self) -> bool {
self.cron_schedule.is_some()
}
pub fn get_cron_schedule(&self) -> Option<Result<CronSchedule, crate::cron::CronError>> {
self.cron_schedule
.as_ref()
.map(|expr| match &self.timezone {
Some(tz) => CronSchedule::with_timezone(expr, tz),
None => CronSchedule::new(expr),
})
}
pub fn calculate_next_run(&self) -> Option<DateTime<Utc>> {
if !self.recurring {
return None;
}
if let Some(cron_schedule) = self.get_cron_schedule() {
match cron_schedule {
Ok(schedule) => schedule.next_execution_from_now(),
Err(_) => None,
}
} else {
None
}
}
pub fn prepare_for_next_run(&mut self) -> Option<DateTime<Utc>> {
if !self.recurring {
return None;
}
let next_run = self.calculate_next_run();
if let Some(next_time) = next_run {
self.status = JobStatus::Pending;
self.attempts = 0;
self.scheduled_at = next_time;
self.next_run_at = Some(next_time);
self.started_at = None;
self.completed_at = None;
self.failed_at = None;
self.timed_out_at = None;
self.error_message = None;
}
next_run
}
pub fn depends_on(mut self, job_id: &JobId) -> Self {
self.depends_on.push(*job_id);
self.dependency_status = crate::workflow::DependencyStatus::Waiting;
self
}
pub fn depends_on_jobs(mut self, job_ids: &[JobId]) -> Self {
self.depends_on.extend_from_slice(job_ids);
if !job_ids.is_empty() {
self.dependency_status = crate::workflow::DependencyStatus::Waiting;
}
self
}
pub fn with_workflow(
mut self,
workflow_id: crate::workflow::WorkflowId,
workflow_name: impl Into<String>,
) -> Self {
self.workflow_id = Some(workflow_id);
self.workflow_name = Some(workflow_name.into());
self
}
pub fn has_dependencies(&self) -> bool {
!self.depends_on.is_empty()
}
pub fn is_part_of_workflow(&self) -> bool {
self.workflow_id.is_some()
}
pub fn dependencies_satisfied(&self) -> bool {
matches!(
self.dependency_status,
crate::workflow::DependencyStatus::None | crate::workflow::DependencyStatus::Satisfied
)
}
pub fn dependencies_failed(&self) -> bool {
matches!(
self.dependency_status,
crate::workflow::DependencyStatus::Failed
)
}
pub fn dependency_count(&self) -> usize {
self.depends_on.len()
}
pub fn dependent_count(&self) -> usize {
self.dependents.len()
}
pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
self.trace_id = Some(trace_id.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
self.parent_span_id = Some(parent_span_id.into());
self
}
pub fn with_span_context(mut self, span_context: impl Into<String>) -> Self {
self.span_context = Some(span_context.into());
self
}
pub fn with_tracing_id(mut self, id: impl Into<String>) -> Self {
let id_string = id.into();
self.trace_id = Some(id_string.clone());
self.correlation_id = Some(id_string);
self
}
pub fn has_tracing_info(&self) -> bool {
self.trace_id.is_some()
|| self.correlation_id.is_some()
|| self.parent_span_id.is_some()
|| self.span_context.is_some()
}
pub fn get_trace_id(&self) -> Option<&str> {
self.trace_id.as_deref()
}
pub fn get_correlation_id(&self) -> Option<&str> {
self.correlation_id.as_deref()
}
#[cfg(feature = "encryption")]
pub fn with_encryption(mut self, config: crate::encryption::EncryptionConfig) -> Self {
self.encryption_config = Some(config);
self
}
pub fn with_pii_fields(mut self, pii_fields: Vec<impl Into<String>>) -> Self {
self.pii_fields = pii_fields.into_iter().map(|f| f.into()).collect();
self
}
#[cfg(feature = "encryption")]
pub fn with_retention_policy(mut self, policy: crate::encryption::RetentionPolicy) -> Self {
self.retention_policy = Some(policy);
self
}
pub fn has_encryption(&self) -> bool {
#[cfg(feature = "encryption")]
{
self.encryption_config.is_some()
}
#[cfg(not(feature = "encryption"))]
{
false
}
}
pub fn has_pii_fields(&self) -> bool {
!self.pii_fields.is_empty()
}
pub fn is_payload_encrypted(&self) -> bool {
self.is_encrypted
}
pub fn get_pii_fields(&self) -> &[String] {
&self.pii_fields
}
#[cfg(feature = "encryption")]
pub fn get_encryption_config(&self) -> Option<&crate::encryption::EncryptionConfig> {
self.encryption_config.as_ref()
}
#[cfg(feature = "encryption")]
pub fn get_retention_policy(&self) -> Option<&crate::encryption::RetentionPolicy> {
self.retention_policy.as_ref()
}
pub fn should_cleanup_encrypted_data(&self) -> bool {
#[cfg(feature = "encryption")]
{
if let Some(encrypted_payload) = &self.encrypted_payload {
encrypted_payload.should_delete_now()
} else if let Some(retention_policy) = &self.retention_policy {
retention_policy.should_delete_now(
self.created_at,
self.completed_at,
self.encryption_config
.as_ref()
.and_then(|c| c.default_retention),
)
} else {
false
}
}
#[cfg(not(feature = "encryption"))]
{
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_job_new() {
let queue_name = "test_queue".to_string();
let payload = json!({"key": "value"});
let job = Job::new(queue_name.clone(), payload.clone());
assert_eq!(job.queue_name, queue_name);
assert_eq!(job.payload, payload);
assert_eq!(job.status, JobStatus::Pending);
assert_eq!(job.attempts, 0);
assert_eq!(job.max_attempts, 3);
assert!(job.started_at.is_none());
assert!(job.completed_at.is_none());
assert!(job.failed_at.is_none());
assert!(job.error_message.is_none());
assert_eq!(job.created_at, job.scheduled_at);
}
#[test]
fn test_job_with_delay() {
let queue_name = "test_queue".to_string();
let payload = json!({"key": "value"});
let delay = chrono::Duration::minutes(5);
let job = Job::with_delay(queue_name.clone(), payload.clone(), delay);
assert_eq!(job.queue_name, queue_name);
assert_eq!(job.payload, payload);
assert_eq!(job.status, JobStatus::Pending);
assert_eq!(job.attempts, 0);
assert_eq!(job.max_attempts, 3);
assert!(job.scheduled_at > job.created_at);
assert_eq!(job.scheduled_at - job.created_at, delay);
}
#[test]
fn test_job_with_max_attempts() {
let queue_name = "test_queue".to_string();
let payload = json!({"key": "value"});
let job = Job::new(queue_name, payload).with_max_attempts(5);
assert_eq!(job.max_attempts, 5);
}
#[test]
fn test_job_with_delay_and_max_attempts() {
let queue_name = "test_queue".to_string();
let payload = json!({"key": "value"});
let delay = chrono::Duration::hours(1);
let job = Job::with_delay(queue_name, payload, delay).with_max_attempts(10);
assert_eq!(job.max_attempts, 10);
assert!(job.scheduled_at > job.created_at);
}
#[test]
fn test_job_status_equality() {
assert_eq!(JobStatus::Pending, JobStatus::Pending);
assert_eq!(JobStatus::Running, JobStatus::Running);
assert_eq!(JobStatus::Completed, JobStatus::Completed);
assert_eq!(JobStatus::Failed, JobStatus::Failed);
assert_eq!(JobStatus::Dead, JobStatus::Dead);
assert_eq!(JobStatus::TimedOut, JobStatus::TimedOut);
assert_eq!(JobStatus::Retrying, JobStatus::Retrying);
assert_ne!(JobStatus::Pending, JobStatus::Running);
assert_ne!(JobStatus::Completed, JobStatus::Failed);
assert_ne!(JobStatus::Failed, JobStatus::Dead);
assert_ne!(JobStatus::Dead, JobStatus::TimedOut);
assert_ne!(JobStatus::TimedOut, JobStatus::Failed);
}
#[test]
fn test_job_serialization() {
let job = Job::new("test".to_string(), json!({"data": "test"}));
let serialized = serde_json::to_string(&job).unwrap();
let deserialized: Job = serde_json::from_str(&serialized).unwrap();
assert_eq!(job.id, deserialized.id);
assert_eq!(job.queue_name, deserialized.queue_name);
assert_eq!(job.payload, deserialized.payload);
assert_eq!(job.status, deserialized.status);
assert_eq!(job.attempts, deserialized.attempts);
assert_eq!(job.max_attempts, deserialized.max_attempts);
}
#[test]
fn test_job_status_serialization() {
let statuses = vec![
JobStatus::Pending,
JobStatus::Running,
JobStatus::Completed,
JobStatus::Failed,
JobStatus::Dead,
JobStatus::TimedOut,
JobStatus::Retrying,
];
for status in statuses {
let serialized = serde_json::to_string(&status).unwrap();
let deserialized: JobStatus = serde_json::from_str(&serialized).unwrap();
assert_eq!(status, deserialized);
}
}
#[test]
fn test_job_dead_status_methods() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
assert!(!job.is_dead());
assert!(!job.has_exhausted_retries());
job.attempts = 3;
job.max_attempts = 3;
assert!(job.has_exhausted_retries());
assert!(!job.is_dead());
job.status = JobStatus::Dead;
job.failed_at = Some(Utc::now());
assert!(job.is_dead());
assert!(job.has_exhausted_retries());
}
#[test]
fn test_job_processing_duration() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
assert!(job.processing_duration().is_none());
let start_time = Utc::now();
job.started_at = Some(start_time);
let duration = job.processing_duration().unwrap();
assert!(duration.num_milliseconds() >= 0);
let completion_time = start_time + chrono::Duration::seconds(5);
job.completed_at = Some(completion_time);
let final_duration = job.processing_duration().unwrap();
assert_eq!(final_duration.num_seconds(), 5);
}
#[test]
fn test_job_age() {
let job = Job::new("test".to_string(), json!({"data": "test"}));
let age = job.age();
assert!(age.num_milliseconds() >= 0);
assert!(age.num_seconds() < 1);
}
#[test]
fn test_job_with_timeout() {
let timeout = std::time::Duration::from_secs(30);
let job = Job::new("test".to_string(), json!({"data": "test"})).with_timeout(timeout);
assert_eq!(job.timeout, Some(timeout));
assert!(!job.is_timed_out()); }
#[test]
fn test_job_timeout_status_methods() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
assert!(!job.is_timed_out());
job.status = JobStatus::TimedOut;
job.timed_out_at = Some(Utc::now());
assert!(job.is_timed_out());
}
#[test]
fn test_job_should_timeout() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}))
.with_timeout(std::time::Duration::from_millis(100));
assert!(!job.should_timeout());
job.started_at = Some(Utc::now() - chrono::Duration::milliseconds(200));
assert!(job.should_timeout());
let mut job_no_timeout = Job::new("test".to_string(), json!({"data": "test"}));
job_no_timeout.started_at = Some(Utc::now() - chrono::Duration::hours(1));
assert!(!job_no_timeout.should_timeout());
}
#[test]
fn test_job_with_delay_and_timeout() {
let delay = chrono::Duration::minutes(5);
let timeout = std::time::Duration::from_secs(120);
let job = Job::with_delay("test".to_string(), json!({"data": "test"}), delay)
.with_timeout(timeout)
.with_max_attempts(5);
assert_eq!(job.timeout, Some(timeout));
assert_eq!(job.max_attempts, 5);
assert!(job.scheduled_at > job.created_at);
assert_eq!(job.scheduled_at - job.created_at, delay);
}
#[test]
fn test_processing_duration_with_timeout() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
let start_time = Utc::now() - chrono::Duration::seconds(5);
let timeout_time = start_time + chrono::Duration::seconds(3);
job.started_at = Some(start_time);
job.timed_out_at = Some(timeout_time);
let duration = job.processing_duration().unwrap();
assert_eq!(duration.num_seconds(), 3);
}
#[test]
fn test_timeout_builder_methods() {
let job = Job::new("test".to_string(), json!({"key": "value"}))
.with_timeout(std::time::Duration::from_secs(120))
.with_max_attempts(5);
assert_eq!(job.timeout, Some(std::time::Duration::from_secs(120)));
assert_eq!(job.max_attempts, 5);
assert_eq!(job.queue_name, "test");
}
#[test]
fn test_job_timeout_edge_cases() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
assert!(!job.should_timeout());
job.timeout = Some(std::time::Duration::from_millis(100));
assert!(!job.should_timeout());
job.started_at = Some(Utc::now() - chrono::Duration::milliseconds(50));
assert!(!job.should_timeout());
job.started_at = Some(Utc::now() - chrono::Duration::milliseconds(150));
assert!(job.should_timeout());
}
#[test]
fn test_job_status_transitions_with_timeout() {
let mut job = Job::new("test".to_string(), json!({"data": "test"}));
assert_eq!(job.status, JobStatus::Pending);
assert!(!job.is_timed_out());
job.status = JobStatus::TimedOut;
job.timed_out_at = Some(Utc::now());
assert!(job.is_timed_out());
assert!(!job.is_dead()); }
#[test]
fn test_timeout_serialization_compatibility() {
let original_job = Job::new("test_queue".to_string(), json!({"data": "test"}))
.with_timeout(std::time::Duration::from_secs(300))
.with_max_attempts(5);
let serialized = serde_json::to_string(&original_job).unwrap();
let deserialized: Job = serde_json::from_str(&serialized).unwrap();
assert_eq!(original_job.timeout, deserialized.timeout);
assert_eq!(original_job.timed_out_at, deserialized.timed_out_at);
assert_eq!(original_job.status, deserialized.status);
}
#[test]
fn test_job_with_all_timeout_fields() {
let timeout_duration = std::time::Duration::from_secs(60);
let mut job = Job::new("comprehensive_test".to_string(), json!({"test": true}))
.with_timeout(timeout_duration)
.with_max_attempts(3);
job.started_at = Some(Utc::now() - chrono::Duration::seconds(30));
job.status = JobStatus::Running;
assert!(!job.should_timeout());
job.status = JobStatus::TimedOut;
job.timed_out_at = Some(Utc::now());
job.error_message = Some("Job timed out after 60s".to_string());
assert!(job.is_timed_out());
assert_eq!(job.timeout, Some(timeout_duration));
assert!(job.timed_out_at.is_some());
assert!(job.error_message.is_some());
}
#[test]
fn test_job_status_backward_compatibility_string_matching() {
let test_cases = [
("Pending", JobStatus::Pending),
("\"Pending\"", JobStatus::Pending),
("Running", JobStatus::Running),
("\"Running\"", JobStatus::Running),
("Completed", JobStatus::Completed),
("\"Completed\"", JobStatus::Completed),
("Failed", JobStatus::Failed),
("\"Failed\"", JobStatus::Failed),
("Dead", JobStatus::Dead),
("\"Dead\"", JobStatus::Dead),
("TimedOut", JobStatus::TimedOut),
("\"TimedOut\"", JobStatus::TimedOut),
("Retrying", JobStatus::Retrying),
("\"Retrying\"", JobStatus::Retrying),
("Archived", JobStatus::Archived),
("\"Archived\"", JobStatus::Archived),
];
for (input, expected) in &test_cases {
let cleaned_str = input.trim_matches('"');
let parsed_status = match cleaned_str {
"Pending" => JobStatus::Pending,
"Running" => JobStatus::Running,
"Completed" => JobStatus::Completed,
"Failed" => JobStatus::Failed,
"Dead" => JobStatus::Dead,
"TimedOut" => JobStatus::TimedOut,
"Retrying" => JobStatus::Retrying,
"Archived" => JobStatus::Archived,
_ => panic!("Unknown job status: {}", input),
};
assert_eq!(
*expected, parsed_status,
"Failed to parse '{}' correctly",
input
);
}
}
#[test]
fn test_job_status_encoding_logic() {
let statuses = [
(JobStatus::Pending, "Pending"),
(JobStatus::Running, "Running"),
(JobStatus::Completed, "Completed"),
(JobStatus::Failed, "Failed"),
(JobStatus::Dead, "Dead"),
(JobStatus::TimedOut, "TimedOut"),
(JobStatus::Retrying, "Retrying"),
(JobStatus::Archived, "Archived"),
];
for (status, expected_str) in &statuses {
let encoded_str = match status {
JobStatus::Pending => "Pending",
JobStatus::Running => "Running",
JobStatus::Completed => "Completed",
JobStatus::Failed => "Failed",
JobStatus::Dead => "Dead",
JobStatus::TimedOut => "TimedOut",
JobStatus::Retrying => "Retrying",
JobStatus::Archived => "Archived",
};
assert_eq!(
*expected_str, encoded_str,
"Encoding mismatch for {:?}",
status
);
assert!(
!encoded_str.starts_with('"'),
"Encoded string should not start with quotes: {}",
encoded_str
);
assert!(
!encoded_str.ends_with('"'),
"Encoded string should not end with quotes: {}",
encoded_str
);
}
}
}