1use std::sync::Arc;
9use tokio::sync::mpsc;
10
11use crate::config::EmailConfig;
12use crate::email::EmailMessage;
13use crate::sessions::SessionBackend;
14
15#[derive(Debug)]
17pub enum Job {
18 SessionCleanup,
20
21 SendEmail { message: EmailMessage },
23
24 Custom {
27 label: String,
28 #[allow(dead_code)]
29 payload: serde_json::Value,
30 },
31}
32
33#[derive(Clone)]
37pub struct JobQueue {
38 tx: mpsc::Sender<Job>,
39}
40
41impl JobQueue {
42 pub async fn enqueue(&self, job: Job) -> bool {
45 match self.tx.try_send(job) {
46 Ok(()) => true,
47 Err(mpsc::error::TrySendError::Full(job)) => {
48 tracing::warn!("Job queue full, dropping job: {:?}", job);
49 false
50 }
51 Err(mpsc::error::TrySendError::Closed(job)) => {
52 tracing::warn!("Job queue closed, dropping job: {:?}", job);
53 false
54 }
55 }
56 }
57}
58
59struct WorkerContext {
61 sessions: Option<SessionBackend>,
62 email_config: Option<EmailConfig>,
63}
64
65pub fn start(sessions: Option<SessionBackend>, email_config: Option<EmailConfig>) -> JobQueue {
72 let (tx, rx) = mpsc::channel::<Job>(256);
73 let queue = JobQueue { tx };
74
75 if let Ok(_handle) = tokio::runtime::Handle::try_current() {
77 let ctx = Arc::new(WorkerContext {
78 sessions,
79 email_config,
80 });
81
82 let worker_ctx = Arc::clone(&ctx);
84 tokio::spawn(async move {
85 run_worker(rx, worker_ctx).await;
86 });
87
88 let cleanup_queue = queue.clone();
90 tokio::spawn(async move {
91 run_session_cleanup_timer(cleanup_queue).await;
92 });
93
94 tracing::info!("Background job queue started (capacity: 256)");
95 } else {
96 tracing::debug!("No Tokio runtime — job queue created without background worker");
97 }
98
99 queue
100}
101
102async fn run_worker(mut rx: mpsc::Receiver<Job>, ctx: Arc<WorkerContext>) {
104 while let Some(job) = rx.recv().await {
105 let ctx = Arc::clone(&ctx);
106 tokio::spawn(async move {
108 execute_job(job, &ctx).await;
109 });
110 }
111 tracing::debug!("Job worker shutting down — channel closed");
112}
113
114async fn execute_job(job: Job, ctx: &WorkerContext) {
116 match job {
117 Job::SessionCleanup => {
118 if let Some(ref sessions) = ctx.sessions {
119 match sessions.cleanup_expired().await {
120 Ok(count) if count > 0 => {
121 tracing::info!("Session cleanup: purged {} expired sessions", count);
122 }
123 Ok(_) => {
124 tracing::debug!("Session cleanup: no expired sessions");
125 }
126 Err(e) => {
127 tracing::warn!("Session cleanup failed: {}", e);
128 }
129 }
130 }
131 }
132 Job::SendEmail { message } => {
133 if let Some(ref email_config) = ctx.email_config {
134 match crate::email::send_email(email_config, &message).await {
135 Ok(()) => {
136 tracing::info!("Email sent to {}", message.to);
137 }
138 Err(e) => {
139 tracing::error!("Failed to send email to {}: {}", message.to, e);
140 }
141 }
142 } else {
143 tracing::warn!(
144 "SendEmail job received but no [email] config — dropping email to {}",
145 message.to
146 );
147 }
148 }
149 Job::Custom { label, .. } => {
150 tracing::debug!("Custom job executed: {}", label);
151 }
152 }
153}
154
155async fn run_session_cleanup_timer(queue: JobQueue) {
157 let mut interval = tokio::time::interval(std::time::Duration::from_secs(3600));
158 interval.tick().await;
160
161 loop {
162 interval.tick().await;
163 let _ = queue.enqueue(Job::SessionCleanup).await;
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170
171 #[tokio::test]
172 async fn test_job_queue_enqueue() {
173 let (tx, mut rx) = mpsc::channel(16);
174 let queue = JobQueue { tx };
175
176 let ok = queue.enqueue(Job::SessionCleanup).await;
177 assert!(ok);
178
179 let job = rx.try_recv().unwrap();
180 assert!(matches!(job, Job::SessionCleanup));
181 }
182
183 #[tokio::test]
184 async fn test_job_queue_full() {
185 let (tx, _rx) = mpsc::channel(1);
186 let queue = JobQueue { tx };
187
188 assert!(queue.enqueue(Job::SessionCleanup).await);
190 let ok = queue.enqueue(Job::SessionCleanup).await;
192 assert!(!ok);
193 }
194
195 #[tokio::test]
196 async fn test_job_queue_closed() {
197 let (tx, rx) = mpsc::channel(16);
198 let queue = JobQueue { tx };
199
200 drop(rx); let ok = queue.enqueue(Job::SessionCleanup).await;
202 assert!(!ok);
203 }
204
205 #[tokio::test]
206 async fn test_start_returns_queue() {
207 let queue = start(None, None);
208
209 let ok = queue
211 .enqueue(Job::Custom {
212 label: "test".to_string(),
213 payload: serde_json::Value::Null,
214 })
215 .await;
216 assert!(ok);
217
218 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
220 }
221
222 #[tokio::test]
223 async fn test_session_cleanup_without_sessions() {
224 let ctx = Arc::new(WorkerContext {
226 sessions: None,
227 email_config: None,
228 });
229 execute_job(Job::SessionCleanup, &ctx).await;
230 }
232
233 #[tokio::test]
234 async fn test_custom_job_execution() {
235 let ctx = Arc::new(WorkerContext {
236 sessions: None,
237 email_config: None,
238 });
239 execute_job(
240 Job::Custom {
241 label: "test-job".to_string(),
242 payload: serde_json::json!({"key": "value"}),
243 },
244 &ctx,
245 )
246 .await;
247 }
249
250 #[tokio::test]
251 async fn test_send_email_job_without_config() {
252 let ctx = Arc::new(WorkerContext {
254 sessions: None,
255 email_config: None,
256 });
257 execute_job(
258 Job::SendEmail {
259 message: EmailMessage {
260 to: "user@example.com".into(),
261 subject: "Test".into(),
262 html_body: "<p>Hi</p>".into(),
263 text_body: None,
264 },
265 },
266 &ctx,
267 )
268 .await;
269 }
270
271 #[tokio::test]
272 async fn test_send_email_job_enqueue() {
273 let (tx, mut rx) = mpsc::channel(16);
274 let queue = JobQueue { tx };
275
276 let msg = EmailMessage {
277 to: "user@example.com".into(),
278 subject: "Welcome".into(),
279 html_body: "<h1>Hi</h1>".into(),
280 text_body: None,
281 };
282 let ok = queue.enqueue(Job::SendEmail { message: msg }).await;
283 assert!(ok);
284
285 let job = rx.try_recv().unwrap();
286 assert!(matches!(job, Job::SendEmail { .. }));
287 }
288
289 #[test]
290 fn test_job_debug_format() {
291 let job = Job::SessionCleanup;
292 let debug = format!("{:?}", job);
293 assert!(debug.contains("SessionCleanup"));
294
295 let job = Job::SendEmail {
296 message: EmailMessage {
297 to: "a@b.com".into(),
298 subject: "Hi".into(),
299 html_body: "<p>Hi</p>".into(),
300 text_body: None,
301 },
302 };
303 let debug = format!("{:?}", job);
304 assert!(debug.contains("SendEmail"));
305
306 let job = Job::Custom {
307 label: "hello".to_string(),
308 payload: serde_json::Value::Null,
309 };
310 let debug = format!("{:?}", job);
311 assert!(debug.contains("Custom"));
312 assert!(debug.contains("hello"));
313 }
314
315 #[test]
316 fn test_job_queue_is_clone() {
317 fn assert_clone<T: Clone>() {}
319 assert_clone::<JobQueue>();
320 }
321}