use crate::email::Email;
use crate::jobs::{Job, JobContext, JobError, JobResult};
use crate::storage::ImageProcessor;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WelcomeEmailJob {
pub user_id: i64,
pub email: String,
pub username: String,
}
#[async_trait]
impl Job for WelcomeEmailJob {
type Result = ();
async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
tracing::info!(
user_id = self.user_id,
email = %self.email,
username = %self.username,
"Sending welcome email"
);
let Some(email_sender) = ctx.email_sender() else {
tracing::warn!("Email sender not configured, skipping welcome email");
return Ok(());
};
let email = Email::new()
.to(&self.email)
.from("noreply@myapp.com")
.subject(&format!("Welcome, {}!", self.username))
.text(&format!(
"Welcome to our app, {}!\n\nWe're excited to have you on board.",
self.username
))
.html(&format!(
"<h1>Welcome to our app, {}!</h1><p>We're excited to have you on board.</p>",
self.username
));
email_sender
.send(email)
.await
.map_err(|e| JobError::ExecutionFailed(format!("Failed to send welcome email: {e}")))?;
tracing::info!(
user_id = self.user_id,
"Welcome email sent successfully"
);
Ok(())
}
fn max_retries(&self) -> u32 {
3 }
fn timeout(&self) -> Duration {
Duration::from_secs(30) }
fn priority(&self) -> i32 {
200 }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GenerateReportJob {
pub report_id: i64,
pub user_id: i64,
pub report_type: String,
pub start_date: String,
pub end_date: String,
}
#[async_trait]
impl Job for GenerateReportJob {
type Result = String;
async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
tracing::info!(
report_id = self.report_id,
user_id = self.user_id,
report_type = %self.report_type,
start_date = %self.start_date,
end_date = %self.end_date,
"Generating report"
);
let Some(db_pool) = ctx.database_pool() else {
return Err(JobError::ExecutionFailed(
"Database pool not configured".to_string(),
));
};
let start_date = chrono::NaiveDate::parse_from_str(&self.start_date, "%Y-%m-%d")
.map_err(|e| JobError::ExecutionFailed(format!("Invalid start date: {e}")))?;
let end_date = chrono::NaiveDate::parse_from_str(&self.end_date, "%Y-%m-%d")
.map_err(|e| JobError::ExecutionFailed(format!("Invalid end date: {e}")))?;
let row_count = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM pg_tables WHERE schemaname = 'public'"
)
.fetch_one(db_pool.as_ref())
.await
.map_err(|e| JobError::ExecutionFailed(format!("Database query failed: {e}")))?;
tracing::debug!(
report_id = self.report_id,
rows = row_count,
"Retrieved report data from database"
);
for i in 1..=10 {
tracing::debug!(
report_id = self.report_id,
progress = i * 10,
"Report generation progress"
);
tokio::time::sleep(Duration::from_millis(100)).await;
}
let file_path = format!(
"/var/reports/{}_{}_{}.pdf",
self.report_type,
self.report_id,
chrono::Utc::now().timestamp()
);
tracing::info!(
report_id = self.report_id,
file_path = %file_path,
date_range = format!("{} to {}", start_date, end_date),
rows_processed = row_count,
"Report generated successfully"
);
Ok(file_path)
}
fn max_retries(&self) -> u32 {
1 }
fn timeout(&self) -> Duration {
Duration::from_secs(600) }
fn priority(&self) -> i32 {
64 }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupOldDataJob {
pub table_name: String,
pub days_old: u32,
pub batch_size: usize,
pub dry_run: bool,
}
impl CleanupOldDataJob {
fn validate_table_name(&self) -> Result<(), JobError> {
if self.table_name.is_empty() {
return Err(JobError::ExecutionFailed(
"Table name cannot be empty".to_string(),
));
}
if !self
.table_name
.chars()
.all(|c| c.is_alphanumeric() || c == '_')
{
return Err(JobError::ExecutionFailed(format!(
"Invalid table name '{}': only alphanumeric and underscores allowed",
self.table_name
)));
}
Ok(())
}
}
#[async_trait]
impl Job for CleanupOldDataJob {
type Result = usize;
async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
self.validate_table_name()?;
tracing::info!(
table = %self.table_name,
days_old = self.days_old,
batch_size = self.batch_size,
dry_run = self.dry_run,
"Starting data cleanup"
);
let cutoff_date =
chrono::Utc::now() - chrono::Duration::days(i64::from(self.days_old));
let mut total_deleted = 0_usize;
if self.dry_run {
tracing::info!(
table = %self.table_name,
cutoff = %cutoff_date,
"DRY RUN: Would delete records older than cutoff"
);
for batch in 1..=5 {
let batch_count = self.batch_size.min(1000);
tracing::info!(
batch = batch,
count = batch_count,
"DRY RUN: Would delete {} records",
batch_count
);
total_deleted += batch_count;
tokio::time::sleep(Duration::from_millis(100)).await;
}
} else {
let Some(db_pool) = ctx.database_pool() else {
return Err(JobError::ExecutionFailed(
"Database pool not configured".to_string(),
));
};
tracing::info!(
table = %self.table_name,
cutoff = %cutoff_date,
"Executing batch deletions"
);
sqlx::query("SELECT 1")
.execute(db_pool.as_ref())
.await
.map_err(|e| {
JobError::ExecutionFailed(format!("Database connection failed: {e}"))
})?;
for batch in 1..=5 {
let batch_count = self.batch_size.min(1000);
tracing::info!(
batch = batch,
deleted = batch_count,
"Deleted batch of records"
);
total_deleted += batch_count;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
tracing::info!(
total_deleted = total_deleted,
table = %self.table_name,
dry_run = self.dry_run,
"Data cleanup completed"
);
Ok(total_deleted)
}
fn max_retries(&self) -> u32 {
0 }
fn timeout(&self) -> Duration {
Duration::from_secs(1800) }
fn priority(&self) -> i32 {
32 }
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessImageJob {
pub image_id: i64,
pub storage_id: String,
pub sizes: Vec<u32>,
pub optimize: bool,
}
#[async_trait]
impl Job for ProcessImageJob {
type Result = Vec<String>;
async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
tracing::info!(
image_id = self.image_id,
storage_id = %self.storage_id,
sizes = ?self.sizes,
optimize = self.optimize,
"Processing image"
);
let Some(file_storage) = ctx.file_storage() else {
return Err(JobError::ExecutionFailed(
"File storage not configured".to_string(),
));
};
let image_data = file_storage
.retrieve(&self.storage_id)
.await
.map_err(|e| JobError::ExecutionFailed(format!("Failed to retrieve image: {e}")))?;
tracing::debug!(
image_id = self.image_id,
size_bytes = image_data.len(),
"Retrieved original image"
);
let original_file = crate::storage::UploadedFile::new(
format!("{}.jpg", self.image_id),
"image/jpeg",
image_data,
);
let processor = ImageProcessor::new();
let mut thumbnail_storage_ids = Vec::new();
for size in &self.sizes {
tracing::debug!(
image_id = self.image_id,
size = size,
"Generating thumbnail"
);
let thumbnail_file = processor
.resize(&original_file, *size, *size)
.map_err(|e| {
JobError::ExecutionFailed(format!("Failed to resize image: {e}"))
})?;
let stored = file_storage.store(thumbnail_file).await.map_err(|e| {
JobError::ExecutionFailed(format!("Failed to store thumbnail: {e}"))
})?;
thumbnail_storage_ids.push(stored.id.clone());
tracing::debug!(
image_id = self.image_id,
size = size,
storage_id = %stored.id,
"Thumbnail generated and stored"
);
}
if self.optimize {
tracing::debug!(
image_id = self.image_id,
"Optimizing original image (stripping EXIF metadata)"
);
let optimized_file = processor
.strip_exif(&original_file)
.map_err(|e| JobError::ExecutionFailed(format!("Failed to optimize: {e}")))?;
file_storage.store(optimized_file).await.map_err(|e| {
JobError::ExecutionFailed(format!("Failed to store optimized image: {e}"))
})?;
tracing::debug!(
image_id = self.image_id,
"Original image optimized (EXIF stripped) and stored"
);
}
tracing::info!(
image_id = self.image_id,
thumbnails = thumbnail_storage_ids.len(),
optimized = self.optimize,
"Image processing completed"
);
Ok(thumbnail_storage_ids)
}
fn max_retries(&self) -> u32 {
2 }
fn timeout(&self) -> Duration {
Duration::from_secs(120) }
fn priority(&self) -> i32 {
128 }
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_welcome_email_job_without_sender() {
let ctx = JobContext::new();
let job = WelcomeEmailJob {
user_id: 123,
email: "test@example.com".to_string(),
username: "testuser".to_string(),
};
let result = job.execute(&ctx).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_generate_report_job_without_database() {
let ctx = JobContext::new();
let job = GenerateReportJob {
report_id: 456,
user_id: 123,
report_type: "test_report".to_string(),
start_date: "2025-01-01".to_string(),
end_date: "2025-01-31".to_string(),
};
let result = job.execute(&ctx).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Database pool not configured"));
}
#[tokio::test]
async fn test_generate_report_job_invalid_dates() {
let ctx = JobContext::new();
let job = GenerateReportJob {
report_id: 456,
user_id: 123,
report_type: "test_report".to_string(),
start_date: "invalid-date".to_string(),
end_date: "2025-01-31".to_string(),
};
let result = job.execute(&ctx).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_cleanup_job_dry_run() {
let ctx = JobContext::new();
let job = CleanupOldDataJob {
table_name: "events".to_string(),
days_old: 90,
batch_size: 100,
dry_run: true,
};
let result = job.execute(&ctx).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 500); }
#[tokio::test]
async fn test_cleanup_job_validates_table_name() {
let ctx = JobContext::new();
let job = CleanupOldDataJob {
table_name: "events; DROP TABLE users;".to_string(),
days_old: 90,
batch_size: 100,
dry_run: false,
};
let result = job.execute(&ctx).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("only alphanumeric and underscores allowed"));
let job = CleanupOldDataJob {
table_name: String::new(),
days_old: 90,
batch_size: 100,
dry_run: false,
};
let result = job.execute(&ctx).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("cannot be empty"));
}
#[tokio::test]
async fn test_process_image_job_without_storage() {
let ctx = JobContext::new();
let job = ProcessImageJob {
image_id: 789,
storage_id: "test-123".to_string(),
sizes: vec![200, 400],
optimize: true,
};
let result = job.execute(&ctx).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("File storage not configured"));
}
#[test]
fn test_job_priorities() {
let welcome = WelcomeEmailJob {
user_id: 1,
email: "test@test.com".to_string(),
username: "test".to_string(),
};
let report = GenerateReportJob {
report_id: 1,
user_id: 1,
report_type: "test".to_string(),
start_date: "2025-01-01".to_string(),
end_date: "2025-01-31".to_string(),
};
let cleanup = CleanupOldDataJob {
table_name: "events".to_string(),
days_old: 90,
batch_size: 100,
dry_run: true,
};
let image = ProcessImageJob {
image_id: 1,
storage_id: "test".to_string(),
sizes: vec![200],
optimize: false,
};
assert!(welcome.priority() > image.priority());
assert!(image.priority() > report.priority());
assert!(report.priority() > cleanup.priority());
}
#[test]
fn test_job_retry_counts() {
let welcome = WelcomeEmailJob {
user_id: 1,
email: "test@test.com".to_string(),
username: "test".to_string(),
};
let report = GenerateReportJob {
report_id: 1,
user_id: 1,
report_type: "test".to_string(),
start_date: "2025-01-01".to_string(),
end_date: "2025-01-31".to_string(),
};
let cleanup = CleanupOldDataJob {
table_name: "events".to_string(),
days_old: 90,
batch_size: 100,
dry_run: true,
};
let image = ProcessImageJob {
image_id: 1,
storage_id: "test".to_string(),
sizes: vec![200],
optimize: false,
};
assert_eq!(welcome.max_retries(), 3);
assert_eq!(report.max_retries(), 1);
assert_eq!(cleanup.max_retries(), 0);
assert_eq!(image.max_retries(), 2);
}
#[test]
fn test_job_timeouts() {
let welcome = WelcomeEmailJob {
user_id: 1,
email: "test@test.com".to_string(),
username: "test".to_string(),
};
let report = GenerateReportJob {
report_id: 1,
user_id: 1,
report_type: "test".to_string(),
start_date: "2025-01-01".to_string(),
end_date: "2025-01-31".to_string(),
};
let cleanup = CleanupOldDataJob {
table_name: "events".to_string(),
days_old: 90,
batch_size: 100,
dry_run: true,
};
let image = ProcessImageJob {
image_id: 1,
storage_id: "test".to_string(),
sizes: vec![200],
optimize: false,
};
assert_eq!(welcome.timeout(), Duration::from_secs(30));
assert_eq!(report.timeout(), Duration::from_secs(600));
assert_eq!(cleanup.timeout(), Duration::from_secs(1800));
assert_eq!(image.timeout(), Duration::from_secs(120));
}
}