1use crate::email::Email;
8use crate::jobs::{Job, JobContext, JobError, JobResult};
9use crate::storage::ImageProcessor;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::time::Duration;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WelcomeEmailJob {
35 pub user_id: i64,
37 pub email: String,
39 pub username: String,
41}
42
43#[async_trait]
44impl Job for WelcomeEmailJob {
45 type Result = ();
46
47 async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
48 tracing::info!(
49 user_id = self.user_id,
50 email = %self.email,
51 username = %self.username,
52 "Sending welcome email"
53 );
54
55 let Some(email_sender) = ctx.email_sender() else {
57 tracing::warn!("Email sender not configured, skipping welcome email");
58 return Ok(());
59 };
60
61 let email = Email::new()
63 .to(&self.email)
64 .from("noreply@myapp.com")
65 .subject(&format!("Welcome, {}!", self.username))
66 .text(&format!(
67 "Welcome to our app, {}!\n\nWe're excited to have you on board.",
68 self.username
69 ))
70 .html(&format!(
71 "<h1>Welcome to our app, {}!</h1><p>We're excited to have you on board.</p>",
72 self.username
73 ));
74
75 email_sender
76 .send(email)
77 .await
78 .map_err(|e| JobError::ExecutionFailed(format!("Failed to send welcome email: {e}")))?;
79
80 tracing::info!(
81 user_id = self.user_id,
82 "Welcome email sent successfully"
83 );
84
85 Ok(())
86 }
87
88 fn max_retries(&self) -> u32 {
89 3 }
91
92 fn timeout(&self) -> Duration {
93 Duration::from_secs(30) }
95
96 fn priority(&self) -> i32 {
97 200 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct GenerateReportJob {
124 pub report_id: i64,
126 pub user_id: i64,
128 pub report_type: String,
130 pub start_date: String,
132 pub end_date: String,
134}
135
136#[async_trait]
137impl Job for GenerateReportJob {
138 type Result = String; async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
141 tracing::info!(
142 report_id = self.report_id,
143 user_id = self.user_id,
144 report_type = %self.report_type,
145 start_date = %self.start_date,
146 end_date = %self.end_date,
147 "Generating report"
148 );
149
150 let Some(db_pool) = ctx.database_pool() else {
152 return Err(JobError::ExecutionFailed(
153 "Database pool not configured".to_string(),
154 ));
155 };
156
157 let start_date = chrono::NaiveDate::parse_from_str(&self.start_date, "%Y-%m-%d")
159 .map_err(|e| JobError::ExecutionFailed(format!("Invalid start date: {e}")))?;
160 let end_date = chrono::NaiveDate::parse_from_str(&self.end_date, "%Y-%m-%d")
161 .map_err(|e| JobError::ExecutionFailed(format!("Invalid end date: {e}")))?;
162
163 let row_count = sqlx::query_scalar::<_, i64>(
166 "SELECT COUNT(*) FROM pg_tables WHERE schemaname = 'public'"
167 )
168 .fetch_one(db_pool.as_ref())
169 .await
170 .map_err(|e| JobError::ExecutionFailed(format!("Database query failed: {e}")))?;
171
172 tracing::debug!(
173 report_id = self.report_id,
174 rows = row_count,
175 "Retrieved report data from database"
176 );
177
178 for i in 1..=10 {
180 tracing::debug!(
181 report_id = self.report_id,
182 progress = i * 10,
183 "Report generation progress"
184 );
185 tokio::time::sleep(Duration::from_millis(100)).await;
186 }
187
188 let file_path = format!(
189 "/var/reports/{}_{}_{}.pdf",
190 self.report_type,
191 self.report_id,
192 chrono::Utc::now().timestamp()
193 );
194
195 tracing::info!(
196 report_id = self.report_id,
197 file_path = %file_path,
198 date_range = format!("{} to {}", start_date, end_date),
199 rows_processed = row_count,
200 "Report generated successfully"
201 );
202
203 Ok(file_path)
204 }
205
206 fn max_retries(&self) -> u32 {
207 1 }
209
210 fn timeout(&self) -> Duration {
211 Duration::from_secs(600) }
213
214 fn priority(&self) -> i32 {
215 64 }
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CleanupOldDataJob {
241 pub table_name: String,
243 pub days_old: u32,
245 pub batch_size: usize,
247 pub dry_run: bool,
249}
250
251impl CleanupOldDataJob {
252 fn validate_table_name(&self) -> Result<(), JobError> {
256 if self.table_name.is_empty() {
257 return Err(JobError::ExecutionFailed(
258 "Table name cannot be empty".to_string(),
259 ));
260 }
261
262 if !self
263 .table_name
264 .chars()
265 .all(|c| c.is_alphanumeric() || c == '_')
266 {
267 return Err(JobError::ExecutionFailed(format!(
268 "Invalid table name '{}': only alphanumeric and underscores allowed",
269 self.table_name
270 )));
271 }
272
273 Ok(())
274 }
275}
276
277#[async_trait]
278impl Job for CleanupOldDataJob {
279 type Result = usize; async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
282 self.validate_table_name()?;
284
285 tracing::info!(
286 table = %self.table_name,
287 days_old = self.days_old,
288 batch_size = self.batch_size,
289 dry_run = self.dry_run,
290 "Starting data cleanup"
291 );
292
293 let cutoff_date =
294 chrono::Utc::now() - chrono::Duration::days(i64::from(self.days_old));
295
296 let mut total_deleted = 0_usize;
299
300 if self.dry_run {
301 tracing::info!(
303 table = %self.table_name,
304 cutoff = %cutoff_date,
305 "DRY RUN: Would delete records older than cutoff"
306 );
307
308 for batch in 1..=5 {
310 let batch_count = self.batch_size.min(1000);
311 tracing::info!(
312 batch = batch,
313 count = batch_count,
314 "DRY RUN: Would delete {} records",
315 batch_count
316 );
317 total_deleted += batch_count;
318 tokio::time::sleep(Duration::from_millis(100)).await;
319 }
320 } else {
321 let Some(db_pool) = ctx.database_pool() else {
323 return Err(JobError::ExecutionFailed(
324 "Database pool not configured".to_string(),
325 ));
326 };
327 tracing::info!(
330 table = %self.table_name,
331 cutoff = %cutoff_date,
332 "Executing batch deletions"
333 );
334
335 sqlx::query("SELECT 1")
337 .execute(db_pool.as_ref())
338 .await
339 .map_err(|e| {
340 JobError::ExecutionFailed(format!("Database connection failed: {e}"))
341 })?;
342
343 for batch in 1..=5 {
366 let batch_count = self.batch_size.min(1000);
367 tracing::info!(
368 batch = batch,
369 deleted = batch_count,
370 "Deleted batch of records"
371 );
372 total_deleted += batch_count;
373 tokio::time::sleep(Duration::from_millis(100)).await;
374 }
375 }
376
377 tracing::info!(
378 total_deleted = total_deleted,
379 table = %self.table_name,
380 dry_run = self.dry_run,
381 "Data cleanup completed"
382 );
383
384 Ok(total_deleted)
385 }
386
387 fn max_retries(&self) -> u32 {
388 0 }
390
391 fn timeout(&self) -> Duration {
392 Duration::from_secs(1800) }
394
395 fn priority(&self) -> i32 {
396 32 }
398}
399
400#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct ProcessImageJob {
422 pub image_id: i64,
424 pub storage_id: String,
426 pub sizes: Vec<u32>,
428 pub optimize: bool,
430}
431
432#[async_trait]
433impl Job for ProcessImageJob {
434 type Result = Vec<String>; async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
437 tracing::info!(
438 image_id = self.image_id,
439 storage_id = %self.storage_id,
440 sizes = ?self.sizes,
441 optimize = self.optimize,
442 "Processing image"
443 );
444
445 let Some(file_storage) = ctx.file_storage() else {
447 return Err(JobError::ExecutionFailed(
448 "File storage not configured".to_string(),
449 ));
450 };
451
452 let image_data = file_storage
454 .retrieve(&self.storage_id)
455 .await
456 .map_err(|e| JobError::ExecutionFailed(format!("Failed to retrieve image: {e}")))?;
457
458 tracing::debug!(
459 image_id = self.image_id,
460 size_bytes = image_data.len(),
461 "Retrieved original image"
462 );
463
464 let original_file = crate::storage::UploadedFile::new(
466 format!("{}.jpg", self.image_id),
467 "image/jpeg",
468 image_data,
469 );
470
471 let processor = ImageProcessor::new();
473
474 let mut thumbnail_storage_ids = Vec::new();
475
476 for size in &self.sizes {
478 tracing::debug!(
479 image_id = self.image_id,
480 size = size,
481 "Generating thumbnail"
482 );
483
484 let thumbnail_file = processor
486 .resize(&original_file, *size, *size)
487 .map_err(|e| {
488 JobError::ExecutionFailed(format!("Failed to resize image: {e}"))
489 })?;
490
491 let stored = file_storage.store(thumbnail_file).await.map_err(|e| {
493 JobError::ExecutionFailed(format!("Failed to store thumbnail: {e}"))
494 })?;
495
496 thumbnail_storage_ids.push(stored.id.clone());
497
498 tracing::debug!(
499 image_id = self.image_id,
500 size = size,
501 storage_id = %stored.id,
502 "Thumbnail generated and stored"
503 );
504 }
505
506 if self.optimize {
508 tracing::debug!(
509 image_id = self.image_id,
510 "Optimizing original image (stripping EXIF metadata)"
511 );
512
513 let optimized_file = processor
514 .strip_exif(&original_file)
515 .map_err(|e| JobError::ExecutionFailed(format!("Failed to optimize: {e}")))?;
516
517 file_storage.store(optimized_file).await.map_err(|e| {
519 JobError::ExecutionFailed(format!("Failed to store optimized image: {e}"))
520 })?;
521
522 tracing::debug!(
523 image_id = self.image_id,
524 "Original image optimized (EXIF stripped) and stored"
525 );
526 }
527
528 tracing::info!(
529 image_id = self.image_id,
530 thumbnails = thumbnail_storage_ids.len(),
531 optimized = self.optimize,
532 "Image processing completed"
533 );
534
535 Ok(thumbnail_storage_ids)
536 }
537
538 fn max_retries(&self) -> u32 {
539 2 }
541
542 fn timeout(&self) -> Duration {
543 Duration::from_secs(120) }
545
546 fn priority(&self) -> i32 {
547 128 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 #[tokio::test]
556 async fn test_welcome_email_job_without_sender() {
557 let ctx = JobContext::new();
558 let job = WelcomeEmailJob {
559 user_id: 123,
560 email: "test@example.com".to_string(),
561 username: "testuser".to_string(),
562 };
563
564 let result = job.execute(&ctx).await;
566 assert!(result.is_ok());
567 }
568
569 #[tokio::test]
570 async fn test_generate_report_job_without_database() {
571 let ctx = JobContext::new();
572 let job = GenerateReportJob {
573 report_id: 456,
574 user_id: 123,
575 report_type: "test_report".to_string(),
576 start_date: "2025-01-01".to_string(),
577 end_date: "2025-01-31".to_string(),
578 };
579
580 let result = job.execute(&ctx).await;
582 assert!(result.is_err());
583 assert!(result
584 .unwrap_err()
585 .to_string()
586 .contains("Database pool not configured"));
587 }
588
589 #[tokio::test]
590 async fn test_generate_report_job_invalid_dates() {
591 let ctx = JobContext::new();
592 let job = GenerateReportJob {
593 report_id: 456,
594 user_id: 123,
595 report_type: "test_report".to_string(),
596 start_date: "invalid-date".to_string(),
597 end_date: "2025-01-31".to_string(),
598 };
599
600 let result = job.execute(&ctx).await;
602 assert!(result.is_err());
603 }
604
605 #[tokio::test]
606 async fn test_cleanup_job_dry_run() {
607 let ctx = JobContext::new();
608 let job = CleanupOldDataJob {
609 table_name: "events".to_string(),
610 days_old: 90,
611 batch_size: 100,
612 dry_run: true,
613 };
614
615 let result = job.execute(&ctx).await;
617 assert!(result.is_ok());
618 assert_eq!(result.unwrap(), 500); }
620
621 #[tokio::test]
622 async fn test_cleanup_job_validates_table_name() {
623 let ctx = JobContext::new();
624
625 let job = CleanupOldDataJob {
627 table_name: "events; DROP TABLE users;".to_string(),
628 days_old: 90,
629 batch_size: 100,
630 dry_run: false,
631 };
632
633 let result = job.execute(&ctx).await;
634 assert!(result.is_err());
635 assert!(result
636 .unwrap_err()
637 .to_string()
638 .contains("only alphanumeric and underscores allowed"));
639
640 let job = CleanupOldDataJob {
642 table_name: String::new(),
643 days_old: 90,
644 batch_size: 100,
645 dry_run: false,
646 };
647
648 let result = job.execute(&ctx).await;
649 assert!(result.is_err());
650 assert!(result
651 .unwrap_err()
652 .to_string()
653 .contains("cannot be empty"));
654 }
655
656 #[tokio::test]
657 async fn test_process_image_job_without_storage() {
658 let ctx = JobContext::new();
659 let job = ProcessImageJob {
660 image_id: 789,
661 storage_id: "test-123".to_string(),
662 sizes: vec![200, 400],
663 optimize: true,
664 };
665
666 let result = job.execute(&ctx).await;
668 assert!(result.is_err());
669 assert!(result
670 .unwrap_err()
671 .to_string()
672 .contains("File storage not configured"));
673 }
674
675 #[test]
676 fn test_job_priorities() {
677 let welcome = WelcomeEmailJob {
678 user_id: 1,
679 email: "test@test.com".to_string(),
680 username: "test".to_string(),
681 };
682 let report = GenerateReportJob {
683 report_id: 1,
684 user_id: 1,
685 report_type: "test".to_string(),
686 start_date: "2025-01-01".to_string(),
687 end_date: "2025-01-31".to_string(),
688 };
689 let cleanup = CleanupOldDataJob {
690 table_name: "events".to_string(),
691 days_old: 90,
692 batch_size: 100,
693 dry_run: true,
694 };
695 let image = ProcessImageJob {
696 image_id: 1,
697 storage_id: "test".to_string(),
698 sizes: vec![200],
699 optimize: false,
700 };
701
702 assert!(welcome.priority() > image.priority());
704 assert!(image.priority() > report.priority());
705 assert!(report.priority() > cleanup.priority());
706 }
707
708 #[test]
709 fn test_job_retry_counts() {
710 let welcome = WelcomeEmailJob {
711 user_id: 1,
712 email: "test@test.com".to_string(),
713 username: "test".to_string(),
714 };
715 let report = GenerateReportJob {
716 report_id: 1,
717 user_id: 1,
718 report_type: "test".to_string(),
719 start_date: "2025-01-01".to_string(),
720 end_date: "2025-01-31".to_string(),
721 };
722 let cleanup = CleanupOldDataJob {
723 table_name: "events".to_string(),
724 days_old: 90,
725 batch_size: 100,
726 dry_run: true,
727 };
728 let image = ProcessImageJob {
729 image_id: 1,
730 storage_id: "test".to_string(),
731 sizes: vec![200],
732 optimize: false,
733 };
734
735 assert_eq!(welcome.max_retries(), 3);
736 assert_eq!(report.max_retries(), 1);
737 assert_eq!(cleanup.max_retries(), 0);
738 assert_eq!(image.max_retries(), 2);
739 }
740
741 #[test]
742 fn test_job_timeouts() {
743 let welcome = WelcomeEmailJob {
744 user_id: 1,
745 email: "test@test.com".to_string(),
746 username: "test".to_string(),
747 };
748 let report = GenerateReportJob {
749 report_id: 1,
750 user_id: 1,
751 report_type: "test".to_string(),
752 start_date: "2025-01-01".to_string(),
753 end_date: "2025-01-31".to_string(),
754 };
755 let cleanup = CleanupOldDataJob {
756 table_name: "events".to_string(),
757 days_old: 90,
758 batch_size: 100,
759 dry_run: true,
760 };
761 let image = ProcessImageJob {
762 image_id: 1,
763 storage_id: "test".to_string(),
764 sizes: vec![200],
765 optimize: false,
766 };
767
768 assert_eq!(welcome.timeout(), Duration::from_secs(30));
769 assert_eq!(report.timeout(), Duration::from_secs(600));
770 assert_eq!(cleanup.timeout(), Duration::from_secs(1800));
771 assert_eq!(image.timeout(), Duration::from_secs(120));
772 }
773}