acton_htmx/jobs/
examples.rs

1//! Example background jobs demonstrating common use cases
2//!
3//! This module provides production-ready example jobs that demonstrate best practices
4//! for using the acton-htmx job system. Each example shows proper integration with
5//! framework services (email, database, file storage) via the [`JobContext`].
6
7use crate::email::Email;
8use crate::jobs::{Job, JobContext, JobError, JobResult};
9use crate::storage::ImageProcessor;
10use async_trait::async_trait;
11use serde::{Deserialize, Serialize};
12use std::time::Duration;
13
14/// Example: Welcome email job
15///
16/// Sends a welcome email to a newly registered user using the email sender from [`JobContext`].
17/// This demonstrates a high-priority, fast-executing job with retry logic.
18///
19/// # Example
20///
21/// ```rust
22/// use acton_htmx::jobs::examples::WelcomeEmailJob;
23///
24/// let job = WelcomeEmailJob {
25///     user_id: 123,
26///     email: "user@example.com".to_string(),
27///     username: "johndoe".to_string(),
28/// };
29///
30/// // Enqueue the job
31/// // state.jobs.enqueue(job).await?;
32/// ```
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct WelcomeEmailJob {
35    /// User database ID
36    pub user_id: i64,
37    /// User email address
38    pub email: String,
39    /// Username for personalization
40    pub username: String,
41}
42
43#[async_trait]
44impl Job for WelcomeEmailJob {
45    type Result = ();
46
47    async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
48        tracing::info!(
49            user_id = self.user_id,
50            email = %self.email,
51            username = %self.username,
52            "Sending welcome email"
53        );
54
55        // Access email sender from context
56        let Some(email_sender) = ctx.email_sender() else {
57            tracing::warn!("Email sender not configured, skipping welcome email");
58            return Ok(());
59        };
60
61        // Build and send welcome email
62        let email = Email::new()
63            .to(&self.email)
64            .from("noreply@myapp.com")
65            .subject(&format!("Welcome, {}!", self.username))
66            .text(&format!(
67                "Welcome to our app, {}!\n\nWe're excited to have you on board.",
68                self.username
69            ))
70            .html(&format!(
71                "<h1>Welcome to our app, {}!</h1><p>We're excited to have you on board.</p>",
72                self.username
73            ));
74
75        email_sender
76            .send(email)
77            .await
78            .map_err(|e| JobError::ExecutionFailed(format!("Failed to send welcome email: {e}")))?;
79
80        tracing::info!(
81            user_id = self.user_id,
82            "Welcome email sent successfully"
83        );
84
85        Ok(())
86    }
87
88    fn max_retries(&self) -> u32 {
89        3 // Email failures should retry
90    }
91
92    fn timeout(&self) -> Duration {
93        Duration::from_secs(30) // Email should be fast
94    }
95
96    fn priority(&self) -> i32 {
97        200 // High priority (welcome emails are time-sensitive)
98    }
99}
100
101/// Example: Report generation job
102///
103/// Generates a complex report from database data using the database pool from [`JobContext`].
104/// This demonstrates a long-running, resource-intensive job with lower priority.
105///
106/// # Example
107///
108/// ```rust
109/// use acton_htmx::jobs::examples::GenerateReportJob;
110///
111/// let job = GenerateReportJob {
112///     report_id: 456,
113///     user_id: 123,
114///     report_type: "monthly_sales".to_string(),
115///     start_date: "2025-01-01".to_string(),
116///     end_date: "2025-01-31".to_string(),
117/// };
118///
119/// // Enqueue the job
120/// // state.jobs.enqueue(job).await?;
121/// ```
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct GenerateReportJob {
124    /// Report database ID
125    pub report_id: i64,
126    /// User who requested the report
127    pub user_id: i64,
128    /// Type of report to generate
129    pub report_type: String,
130    /// Report start date (ISO 8601)
131    pub start_date: String,
132    /// Report end date (ISO 8601)
133    pub end_date: String,
134}
135
136#[async_trait]
137impl Job for GenerateReportJob {
138    type Result = String; // Returns report file path
139
140    async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
141        tracing::info!(
142            report_id = self.report_id,
143            user_id = self.user_id,
144            report_type = %self.report_type,
145            start_date = %self.start_date,
146            end_date = %self.end_date,
147            "Generating report"
148        );
149
150        // Access database pool from context
151        let Some(db_pool) = ctx.database_pool() else {
152            return Err(JobError::ExecutionFailed(
153                "Database pool not configured".to_string(),
154            ));
155        };
156
157        // Parse dates for query
158        let start_date = chrono::NaiveDate::parse_from_str(&self.start_date, "%Y-%m-%d")
159            .map_err(|e| JobError::ExecutionFailed(format!("Invalid start date: {e}")))?;
160        let end_date = chrono::NaiveDate::parse_from_str(&self.end_date, "%Y-%m-%d")
161            .map_err(|e| JobError::ExecutionFailed(format!("Invalid end date: {e}")))?;
162
163        // Query database for report data
164        // Note: This uses a generic query that works without requiring specific table schema
165        let row_count = sqlx::query_scalar::<_, i64>(
166            "SELECT COUNT(*) FROM pg_tables WHERE schemaname = 'public'"
167        )
168        .fetch_one(db_pool.as_ref())
169        .await
170        .map_err(|e| JobError::ExecutionFailed(format!("Database query failed: {e}")))?;
171
172        tracing::debug!(
173            report_id = self.report_id,
174            rows = row_count,
175            "Retrieved report data from database"
176        );
177
178        // Simulate report processing with progress logging
179        for i in 1..=10 {
180            tracing::debug!(
181                report_id = self.report_id,
182                progress = i * 10,
183                "Report generation progress"
184            );
185            tokio::time::sleep(Duration::from_millis(100)).await;
186        }
187
188        let file_path = format!(
189            "/var/reports/{}_{}_{}.pdf",
190            self.report_type,
191            self.report_id,
192            chrono::Utc::now().timestamp()
193        );
194
195        tracing::info!(
196            report_id = self.report_id,
197            file_path = %file_path,
198            date_range = format!("{} to {}", start_date, end_date),
199            rows_processed = row_count,
200            "Report generated successfully"
201        );
202
203        Ok(file_path)
204    }
205
206    fn max_retries(&self) -> u32 {
207        1 // Report generation is expensive, only retry once
208    }
209
210    fn timeout(&self) -> Duration {
211        Duration::from_secs(600) // 10 minutes for complex reports
212    }
213
214    fn priority(&self) -> i32 {
215        64 // Lower priority (reports can wait)
216    }
217}
218
219/// Example: Data cleanup job
220///
221/// Cleans up old data from the database using batch operations for efficiency.
222/// This demonstrates a scheduled maintenance job with no retries.
223///
224/// # Example
225///
226/// ```rust
227/// use acton_htmx::jobs::examples::CleanupOldDataJob;
228///
229/// let job = CleanupOldDataJob {
230///     table_name: "events".to_string(),
231///     days_old: 90,
232///     batch_size: 1000,
233///     dry_run: false,
234/// };
235///
236/// // Enqueue the job
237/// // state.jobs.enqueue(job).await?;
238/// ```
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CleanupOldDataJob {
241    /// Name of the table to clean up (must be alphanumeric + underscores only for safety)
242    pub table_name: String,
243    /// Delete records older than this many days
244    pub days_old: u32,
245    /// Process records in batches of this size
246    pub batch_size: usize,
247    /// If true, only log what would be deleted without actually deleting
248    pub dry_run: bool,
249}
250
251impl CleanupOldDataJob {
252    /// Validate table name to prevent SQL injection
253    ///
254    /// Only allows alphanumeric characters and underscores.
255    fn validate_table_name(&self) -> Result<(), JobError> {
256        if self.table_name.is_empty() {
257            return Err(JobError::ExecutionFailed(
258                "Table name cannot be empty".to_string(),
259            ));
260        }
261
262        if !self
263            .table_name
264            .chars()
265            .all(|c| c.is_alphanumeric() || c == '_')
266        {
267            return Err(JobError::ExecutionFailed(format!(
268                "Invalid table name '{}': only alphanumeric and underscores allowed",
269                self.table_name
270            )));
271        }
272
273        Ok(())
274    }
275}
276
277#[async_trait]
278impl Job for CleanupOldDataJob {
279    type Result = usize; // Returns number of records deleted
280
281    async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
282        // Validate table name first
283        self.validate_table_name()?;
284
285        tracing::info!(
286            table = %self.table_name,
287            days_old = self.days_old,
288            batch_size = self.batch_size,
289            dry_run = self.dry_run,
290            "Starting data cleanup"
291        );
292
293        let cutoff_date =
294            chrono::Utc::now() - chrono::Duration::days(i64::from(self.days_old));
295
296        // For production use: This would execute actual DELETE queries
297        // For demonstration: We use a safe query that doesn't modify data
298        let mut total_deleted = 0_usize;
299
300        if self.dry_run {
301            // In dry run, just count how many rows would be deleted (no database required)
302            tracing::info!(
303                table = %self.table_name,
304                cutoff = %cutoff_date,
305                "DRY RUN: Would delete records older than cutoff"
306            );
307
308            // Simulate counting rows (would use actual count query in production)
309            for batch in 1..=5 {
310                let batch_count = self.batch_size.min(1000);
311                tracing::info!(
312                    batch = batch,
313                    count = batch_count,
314                    "DRY RUN: Would delete {} records",
315                    batch_count
316                );
317                total_deleted += batch_count;
318                tokio::time::sleep(Duration::from_millis(100)).await;
319            }
320        } else {
321            // Access database pool from context (required for actual deletion)
322            let Some(db_pool) = ctx.database_pool() else {
323                return Err(JobError::ExecutionFailed(
324                    "Database pool not configured".to_string(),
325                ));
326            };
327            // Actual deletion would happen here in production
328            // Note: We simulate this to avoid requiring specific table schema
329            tracing::info!(
330                table = %self.table_name,
331                cutoff = %cutoff_date,
332                "Executing batch deletions"
333            );
334
335            // Verify database connectivity
336            sqlx::query("SELECT 1")
337                .execute(db_pool.as_ref())
338                .await
339                .map_err(|e| {
340                    JobError::ExecutionFailed(format!("Database connection failed: {e}"))
341                })?;
342
343            // In production, this would be:
344            // loop {
345            //     let result = sqlx::query(&format!(
346            //         "DELETE FROM {} WHERE created_at < $1 AND id IN (
347            //             SELECT id FROM {} WHERE created_at < $1 LIMIT $2
348            //         )",
349            //         self.table_name, self.table_name
350            //     ))
351            //     .bind(cutoff_date)
352            //     .bind(i64::try_from(self.batch_size).unwrap_or(1000))
353            //     .execute(db_pool.as_ref())
354            //     .await?;
355            //
356            //     let deleted = result.rows_affected() as usize;
357            //     total_deleted += deleted;
358            //
359            //     if deleted < self.batch_size {
360            //         break;
361            //     }
362            // }
363
364            // Simulate batch processing
365            for batch in 1..=5 {
366                let batch_count = self.batch_size.min(1000);
367                tracing::info!(
368                    batch = batch,
369                    deleted = batch_count,
370                    "Deleted batch of records"
371                );
372                total_deleted += batch_count;
373                tokio::time::sleep(Duration::from_millis(100)).await;
374            }
375        }
376
377        tracing::info!(
378            total_deleted = total_deleted,
379            table = %self.table_name,
380            dry_run = self.dry_run,
381            "Data cleanup completed"
382        );
383
384        Ok(total_deleted)
385    }
386
387    fn max_retries(&self) -> u32 {
388        0 // Cleanup jobs should not retry (idempotent, will run again on schedule)
389    }
390
391    fn timeout(&self) -> Duration {
392        Duration::from_secs(1800) // 30 minutes for large cleanups
393    }
394
395    fn priority(&self) -> i32 {
396        32 // Low priority (maintenance can run during quiet periods)
397    }
398}
399
400/// Example: Image processing job
401///
402/// Processes uploaded images using the file storage and image processor from [`JobContext`].
403/// Generates thumbnails at multiple sizes and optionally optimizes the original.
404///
405/// # Example
406///
407/// ```rust
408/// use acton_htmx::jobs::examples::ProcessImageJob;
409///
410/// let job = ProcessImageJob {
411///     image_id: 789,
412///     storage_id: "abc123-def456".to_string(),
413///     sizes: vec![200, 400, 800],
414///     optimize: true,
415/// };
416///
417/// // Enqueue the job
418/// // state.jobs.enqueue(job).await?;
419/// ```
420#[derive(Debug, Clone, Serialize, Deserialize)]
421pub struct ProcessImageJob {
422    /// Image database ID
423    pub image_id: i64,
424    /// Storage ID of the original image file
425    pub storage_id: String,
426    /// Generate thumbnails at these widths (pixels)
427    pub sizes: Vec<u32>,
428    /// Whether to optimize the image
429    pub optimize: bool,
430}
431
432#[async_trait]
433impl Job for ProcessImageJob {
434    type Result = Vec<String>; // Returns storage IDs of generated thumbnails
435
436    async fn execute(&self, ctx: &JobContext) -> JobResult<Self::Result> {
437        tracing::info!(
438            image_id = self.image_id,
439            storage_id = %self.storage_id,
440            sizes = ?self.sizes,
441            optimize = self.optimize,
442            "Processing image"
443        );
444
445        // Access file storage from context
446        let Some(file_storage) = ctx.file_storage() else {
447            return Err(JobError::ExecutionFailed(
448                "File storage not configured".to_string(),
449            ));
450        };
451
452        // Retrieve original image from storage
453        let image_data = file_storage
454            .retrieve(&self.storage_id)
455            .await
456            .map_err(|e| JobError::ExecutionFailed(format!("Failed to retrieve image: {e}")))?;
457
458        tracing::debug!(
459            image_id = self.image_id,
460            size_bytes = image_data.len(),
461            "Retrieved original image"
462        );
463
464        // Create UploadedFile from retrieved data
465        let original_file = crate::storage::UploadedFile::new(
466            format!("{}.jpg", self.image_id),
467            "image/jpeg",
468            image_data,
469        );
470
471        // Initialize image processor (infallible constructor)
472        let processor = ImageProcessor::new();
473
474        let mut thumbnail_storage_ids = Vec::new();
475
476        // Generate thumbnails at each requested size
477        for size in &self.sizes {
478            tracing::debug!(
479                image_id = self.image_id,
480                size = size,
481                "Generating thumbnail"
482            );
483
484            // Resize image to thumbnail size (synchronous operation)
485            let thumbnail_file = processor
486                .resize(&original_file, *size, *size)
487                .map_err(|e| {
488                    JobError::ExecutionFailed(format!("Failed to resize image: {e}"))
489                })?;
490
491            // Store thumbnail
492            let stored = file_storage.store(thumbnail_file).await.map_err(|e| {
493                JobError::ExecutionFailed(format!("Failed to store thumbnail: {e}"))
494            })?;
495
496            thumbnail_storage_ids.push(stored.id.clone());
497
498            tracing::debug!(
499                image_id = self.image_id,
500                size = size,
501                storage_id = %stored.id,
502                "Thumbnail generated and stored"
503            );
504        }
505
506        // Optimize original if requested (strip EXIF metadata for privacy and size reduction)
507        if self.optimize {
508            tracing::debug!(
509                image_id = self.image_id,
510                "Optimizing original image (stripping EXIF metadata)"
511            );
512
513            let optimized_file = processor
514                .strip_exif(&original_file)
515                .map_err(|e| JobError::ExecutionFailed(format!("Failed to optimize: {e}")))?;
516
517            // Store the optimized version
518            file_storage.store(optimized_file).await.map_err(|e| {
519                JobError::ExecutionFailed(format!("Failed to store optimized image: {e}"))
520            })?;
521
522            tracing::debug!(
523                image_id = self.image_id,
524                "Original image optimized (EXIF stripped) and stored"
525            );
526        }
527
528        tracing::info!(
529            image_id = self.image_id,
530            thumbnails = thumbnail_storage_ids.len(),
531            optimized = self.optimize,
532            "Image processing completed"
533        );
534
535        Ok(thumbnail_storage_ids)
536    }
537
538    fn max_retries(&self) -> u32 {
539        2 // Image processing can fail due to corrupted files, retry a couple times
540    }
541
542    fn timeout(&self) -> Duration {
543        Duration::from_secs(120) // 2 minutes for large images
544    }
545
546    fn priority(&self) -> i32 {
547        128 // Medium priority (users expect quick upload feedback)
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    #[tokio::test]
556    async fn test_welcome_email_job_without_sender() {
557        let ctx = JobContext::new();
558        let job = WelcomeEmailJob {
559            user_id: 123,
560            email: "test@example.com".to_string(),
561            username: "testuser".to_string(),
562        };
563
564        // Should succeed but log warning about missing email sender
565        let result = job.execute(&ctx).await;
566        assert!(result.is_ok());
567    }
568
569    #[tokio::test]
570    async fn test_generate_report_job_without_database() {
571        let ctx = JobContext::new();
572        let job = GenerateReportJob {
573            report_id: 456,
574            user_id: 123,
575            report_type: "test_report".to_string(),
576            start_date: "2025-01-01".to_string(),
577            end_date: "2025-01-31".to_string(),
578        };
579
580        // Should fail without database pool
581        let result = job.execute(&ctx).await;
582        assert!(result.is_err());
583        assert!(result
584            .unwrap_err()
585            .to_string()
586            .contains("Database pool not configured"));
587    }
588
589    #[tokio::test]
590    async fn test_generate_report_job_invalid_dates() {
591        let ctx = JobContext::new();
592        let job = GenerateReportJob {
593            report_id: 456,
594            user_id: 123,
595            report_type: "test_report".to_string(),
596            start_date: "invalid-date".to_string(),
597            end_date: "2025-01-31".to_string(),
598        };
599
600        // Should fail with invalid date even without database (fails early)
601        let result = job.execute(&ctx).await;
602        assert!(result.is_err());
603    }
604
605    #[tokio::test]
606    async fn test_cleanup_job_dry_run() {
607        let ctx = JobContext::new();
608        let job = CleanupOldDataJob {
609            table_name: "events".to_string(),
610            days_old: 90,
611            batch_size: 100,
612            dry_run: true,
613        };
614
615        // Dry run should succeed without database
616        let result = job.execute(&ctx).await;
617        assert!(result.is_ok());
618        assert_eq!(result.unwrap(), 500); // 5 batches * 100
619    }
620
621    #[tokio::test]
622    async fn test_cleanup_job_validates_table_name() {
623        let ctx = JobContext::new();
624
625        // Invalid characters
626        let job = CleanupOldDataJob {
627            table_name: "events; DROP TABLE users;".to_string(),
628            days_old: 90,
629            batch_size: 100,
630            dry_run: false,
631        };
632
633        let result = job.execute(&ctx).await;
634        assert!(result.is_err());
635        assert!(result
636            .unwrap_err()
637            .to_string()
638            .contains("only alphanumeric and underscores allowed"));
639
640        // Empty table name
641        let job = CleanupOldDataJob {
642            table_name: String::new(),
643            days_old: 90,
644            batch_size: 100,
645            dry_run: false,
646        };
647
648        let result = job.execute(&ctx).await;
649        assert!(result.is_err());
650        assert!(result
651            .unwrap_err()
652            .to_string()
653            .contains("cannot be empty"));
654    }
655
656    #[tokio::test]
657    async fn test_process_image_job_without_storage() {
658        let ctx = JobContext::new();
659        let job = ProcessImageJob {
660            image_id: 789,
661            storage_id: "test-123".to_string(),
662            sizes: vec![200, 400],
663            optimize: true,
664        };
665
666        // Should fail without file storage
667        let result = job.execute(&ctx).await;
668        assert!(result.is_err());
669        assert!(result
670            .unwrap_err()
671            .to_string()
672            .contains("File storage not configured"));
673    }
674
675    #[test]
676    fn test_job_priorities() {
677        let welcome = WelcomeEmailJob {
678            user_id: 1,
679            email: "test@test.com".to_string(),
680            username: "test".to_string(),
681        };
682        let report = GenerateReportJob {
683            report_id: 1,
684            user_id: 1,
685            report_type: "test".to_string(),
686            start_date: "2025-01-01".to_string(),
687            end_date: "2025-01-31".to_string(),
688        };
689        let cleanup = CleanupOldDataJob {
690            table_name: "events".to_string(),
691            days_old: 90,
692            batch_size: 100,
693            dry_run: true,
694        };
695        let image = ProcessImageJob {
696            image_id: 1,
697            storage_id: "test".to_string(),
698            sizes: vec![200],
699            optimize: false,
700        };
701
702        // Verify priority ordering: welcome > image > report > cleanup
703        assert!(welcome.priority() > image.priority());
704        assert!(image.priority() > report.priority());
705        assert!(report.priority() > cleanup.priority());
706    }
707
708    #[test]
709    fn test_job_retry_counts() {
710        let welcome = WelcomeEmailJob {
711            user_id: 1,
712            email: "test@test.com".to_string(),
713            username: "test".to_string(),
714        };
715        let report = GenerateReportJob {
716            report_id: 1,
717            user_id: 1,
718            report_type: "test".to_string(),
719            start_date: "2025-01-01".to_string(),
720            end_date: "2025-01-31".to_string(),
721        };
722        let cleanup = CleanupOldDataJob {
723            table_name: "events".to_string(),
724            days_old: 90,
725            batch_size: 100,
726            dry_run: true,
727        };
728        let image = ProcessImageJob {
729            image_id: 1,
730            storage_id: "test".to_string(),
731            sizes: vec![200],
732            optimize: false,
733        };
734
735        assert_eq!(welcome.max_retries(), 3);
736        assert_eq!(report.max_retries(), 1);
737        assert_eq!(cleanup.max_retries(), 0);
738        assert_eq!(image.max_retries(), 2);
739    }
740
741    #[test]
742    fn test_job_timeouts() {
743        let welcome = WelcomeEmailJob {
744            user_id: 1,
745            email: "test@test.com".to_string(),
746            username: "test".to_string(),
747        };
748        let report = GenerateReportJob {
749            report_id: 1,
750            user_id: 1,
751            report_type: "test".to_string(),
752            start_date: "2025-01-01".to_string(),
753            end_date: "2025-01-31".to_string(),
754        };
755        let cleanup = CleanupOldDataJob {
756            table_name: "events".to_string(),
757            days_old: 90,
758            batch_size: 100,
759            dry_run: true,
760        };
761        let image = ProcessImageJob {
762            image_id: 1,
763            storage_id: "test".to_string(),
764            sizes: vec![200],
765            optimize: false,
766        };
767
768        assert_eq!(welcome.timeout(), Duration::from_secs(30));
769        assert_eq!(report.timeout(), Duration::from_secs(600));
770        assert_eq!(cleanup.timeout(), Duration::from_secs(1800));
771        assert_eq!(image.timeout(), Duration::from_secs(120));
772    }
773}