Skip to main content

what_core/jobs/
mod.rs

1//! Background job queue for non-blocking operations.
2//!
3//! A simple in-process async task queue built on Tokio channels.
4//! Jobs run as Tokio tasks. Failed jobs log errors (no retry by default).
5//!
6//! The framework uses this internally — users don't interact with it directly.
7
8use std::sync::Arc;
9use tokio::sync::mpsc;
10
11use crate::config::EmailConfig;
12use crate::email::EmailMessage;
13use crate::sessions::SessionBackend;
14
15/// A background job to be executed asynchronously.
16#[derive(Debug)]
17pub enum Job {
18    /// Purge expired sessions from the SQLite session store.
19    SessionCleanup,
20
21    /// Send an email in the background.
22    SendEmail { message: EmailMessage },
23
24    /// A custom closure job (for future extensibility).
25    /// The string is a label for logging.
26    Custom {
27        label: String,
28        #[allow(dead_code)]
29        payload: serde_json::Value,
30    },
31}
32
33/// Handle for enqueuing background jobs.
34///
35/// Cheap to clone — wraps an `mpsc::Sender`.
36#[derive(Clone)]
37pub struct JobQueue {
38    tx: mpsc::Sender<Job>,
39}
40
41impl JobQueue {
42    /// Enqueue a job for background execution.
43    /// Returns `true` if the job was enqueued, `false` if the queue is full or closed.
44    pub async fn enqueue(&self, job: Job) -> bool {
45        match self.tx.try_send(job) {
46            Ok(()) => true,
47            Err(mpsc::error::TrySendError::Full(job)) => {
48                tracing::warn!("Job queue full, dropping job: {:?}", job);
49                false
50            }
51            Err(mpsc::error::TrySendError::Closed(job)) => {
52                tracing::warn!("Job queue closed, dropping job: {:?}", job);
53                false
54            }
55        }
56    }
57}
58
59/// Context available to the job worker for executing jobs.
60struct WorkerContext {
61    sessions: Option<SessionBackend>,
62    email_config: Option<EmailConfig>,
63}
64
65/// Start the background job system.
66///
67/// Returns a `JobQueue` handle for enqueuing jobs.
68/// If called from within a Tokio runtime, also spawns the worker task and
69/// periodic session cleanup timer. If no runtime is available (e.g. in sync tests),
70/// creates the queue without spawning background tasks.
71pub fn start(sessions: Option<SessionBackend>, email_config: Option<EmailConfig>) -> JobQueue {
72    let (tx, rx) = mpsc::channel::<Job>(256);
73    let queue = JobQueue { tx };
74
75    // Only spawn background tasks if a Tokio runtime is available
76    if let Ok(_handle) = tokio::runtime::Handle::try_current() {
77        let ctx = Arc::new(WorkerContext {
78            sessions,
79            email_config,
80        });
81
82        // Spawn the worker that processes jobs from the channel
83        let worker_ctx = Arc::clone(&ctx);
84        tokio::spawn(async move {
85            run_worker(rx, worker_ctx).await;
86        });
87
88        // Spawn periodic session cleanup (every hour)
89        let cleanup_queue = queue.clone();
90        tokio::spawn(async move {
91            run_session_cleanup_timer(cleanup_queue).await;
92        });
93
94        tracing::info!("Background job queue started (capacity: 256)");
95    } else {
96        tracing::debug!("No Tokio runtime — job queue created without background worker");
97    }
98
99    queue
100}
101
102/// The main worker loop — receives jobs and executes them.
103async fn run_worker(mut rx: mpsc::Receiver<Job>, ctx: Arc<WorkerContext>) {
104    while let Some(job) = rx.recv().await {
105        let ctx = Arc::clone(&ctx);
106        // Spawn each job as its own task so one slow job doesn't block others
107        tokio::spawn(async move {
108            execute_job(job, &ctx).await;
109        });
110    }
111    tracing::debug!("Job worker shutting down — channel closed");
112}
113
114/// Execute a single job.
115async fn execute_job(job: Job, ctx: &WorkerContext) {
116    match job {
117        Job::SessionCleanup => {
118            if let Some(ref sessions) = ctx.sessions {
119                match sessions.cleanup_expired().await {
120                    Ok(count) if count > 0 => {
121                        tracing::info!("Session cleanup: purged {} expired sessions", count);
122                    }
123                    Ok(_) => {
124                        tracing::debug!("Session cleanup: no expired sessions");
125                    }
126                    Err(e) => {
127                        tracing::warn!("Session cleanup failed: {}", e);
128                    }
129                }
130            }
131        }
132        Job::SendEmail { message } => {
133            if let Some(ref email_config) = ctx.email_config {
134                match crate::email::send_email(email_config, &message).await {
135                    Ok(()) => {
136                        tracing::info!("Email sent to {}", message.to);
137                    }
138                    Err(e) => {
139                        tracing::error!("Failed to send email to {}: {}", message.to, e);
140                    }
141                }
142            } else {
143                tracing::warn!(
144                    "SendEmail job received but no [email] config — dropping email to {}",
145                    message.to
146                );
147            }
148        }
149        Job::Custom { label, .. } => {
150            tracing::debug!("Custom job executed: {}", label);
151        }
152    }
153}
154
155/// Periodically enqueue session cleanup jobs (every hour).
156async fn run_session_cleanup_timer(queue: JobQueue) {
157    let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
158    // Skip the first immediate tick — startup cleanup already runs in AppState::with_dev_mode
159    interval.tick().await;
160
161    loop {
162        interval.tick().await;
163        let _ = queue.enqueue(Job::SessionCleanup).await;
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170
171    #[tokio::test]
172    async fn test_job_queue_enqueue() {
173        let (tx, mut rx) = mpsc::channel(16);
174        let queue = JobQueue { tx };
175
176        let ok = queue.enqueue(Job::SessionCleanup).await;
177        assert!(ok);
178
179        let job = rx.try_recv().unwrap();
180        assert!(matches!(job, Job::SessionCleanup));
181    }
182
183    #[tokio::test]
184    async fn test_job_queue_full() {
185        let (tx, _rx) = mpsc::channel(1);
186        let queue = JobQueue { tx };
187
188        // Fill the channel
189        assert!(queue.enqueue(Job::SessionCleanup).await);
190        // Next enqueue should fail (channel full, no receiver draining)
191        let ok = queue.enqueue(Job::SessionCleanup).await;
192        assert!(!ok);
193    }
194
195    #[tokio::test]
196    async fn test_job_queue_closed() {
197        let (tx, rx) = mpsc::channel(16);
198        let queue = JobQueue { tx };
199
200        drop(rx); // Close the receiver
201        let ok = queue.enqueue(Job::SessionCleanup).await;
202        assert!(!ok);
203    }
204
205    #[tokio::test]
206    async fn test_start_returns_queue() {
207        let queue = start(None, None);
208
209        // Can enqueue a job
210        let ok = queue
211            .enqueue(Job::Custom {
212                label: "test".to_string(),
213                payload: serde_json::Value::Null,
214            })
215            .await;
216        assert!(ok);
217
218        // Give the worker a moment to process
219        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
220    }
221
222    #[tokio::test]
223    async fn test_session_cleanup_without_sessions() {
224        // With no session backend, cleanup should be a no-op
225        let ctx = Arc::new(WorkerContext {
226            sessions: None,
227            email_config: None,
228        });
229        execute_job(Job::SessionCleanup, &ctx).await;
230        // Should not panic — just silently skip
231    }
232
233    #[tokio::test]
234    async fn test_custom_job_execution() {
235        let ctx = Arc::new(WorkerContext {
236            sessions: None,
237            email_config: None,
238        });
239        execute_job(
240            Job::Custom {
241                label: "test-job".to_string(),
242                payload: serde_json::json!({"key": "value"}),
243            },
244            &ctx,
245        )
246        .await;
247        // Should not panic
248    }
249
250    #[tokio::test]
251    async fn test_send_email_job_without_config() {
252        // SendEmail job with no email config should not panic — just logs a warning
253        let ctx = Arc::new(WorkerContext {
254            sessions: None,
255            email_config: None,
256        });
257        execute_job(
258            Job::SendEmail {
259                message: EmailMessage {
260                    to: "user@example.com".into(),
261                    subject: "Test".into(),
262                    html_body: "<p>Hi</p>".into(),
263                    text_body: None,
264                },
265            },
266            &ctx,
267        )
268        .await;
269    }
270
271    #[tokio::test]
272    async fn test_send_email_job_enqueue() {
273        let (tx, mut rx) = mpsc::channel(16);
274        let queue = JobQueue { tx };
275
276        let msg = EmailMessage {
277            to: "user@example.com".into(),
278            subject: "Welcome".into(),
279            html_body: "<h1>Hi</h1>".into(),
280            text_body: None,
281        };
282        let ok = queue.enqueue(Job::SendEmail { message: msg }).await;
283        assert!(ok);
284
285        let job = rx.try_recv().unwrap();
286        assert!(matches!(job, Job::SendEmail { .. }));
287    }
288
289    #[test]
290    fn test_job_debug_format() {
291        let job = Job::SessionCleanup;
292        let debug = format!("{:?}", job);
293        assert!(debug.contains("SessionCleanup"));
294
295        let job = Job::SendEmail {
296            message: EmailMessage {
297                to: "a@b.com".into(),
298                subject: "Hi".into(),
299                html_body: "<p>Hi</p>".into(),
300                text_body: None,
301            },
302        };
303        let debug = format!("{:?}", job);
304        assert!(debug.contains("SendEmail"));
305
306        let job = Job::Custom {
307            label: "hello".to_string(),
308            payload: serde_json::Value::Null,
309        };
310        let debug = format!("{:?}", job);
311        assert!(debug.contains("Custom"));
312        assert!(debug.contains("hello"));
313    }
314
315    #[test]
316    fn test_job_queue_is_clone() {
317        // Verify JobQueue implements Clone (compile-time check)
318        fn assert_clone<T: Clone>() {}
319        assert_clone::<JobQueue>();
320    }
321}