1use crate::{Email, EmailError, EmailProvider, EmailResult};
6use async_trait::async_trait;
7use elif_queue::{
8 Job, JobResult, JobEntry, JobId, Priority, Queue, QueueBackend,
9 QueueResult, QueueError, JobState
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{info, warn, error};
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct EmailJob {
22 pub email: Email,
24 pub priority: EmailPriority,
26 pub max_retries: u32,
28 pub timeout_seconds: u64,
30 pub provider: Option<String>,
32 pub metadata: HashMap<String, String>,
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub enum EmailPriority {
39 Low = 0,
41 Normal = 1,
43 High = 2,
45 Critical = 3,
47}
48
49impl Default for EmailPriority {
50 fn default() -> Self {
51 EmailPriority::Normal
52 }
53}
54
55impl From<EmailPriority> for Priority {
56 fn from(email_priority: EmailPriority) -> Self {
57 match email_priority {
58 EmailPriority::Low => Priority::Low,
59 EmailPriority::Normal => Priority::Normal,
60 EmailPriority::High => Priority::High,
61 EmailPriority::Critical => Priority::Critical,
62 }
63 }
64}
65
66impl From<Priority> for EmailPriority {
67 fn from(priority: Priority) -> Self {
68 match priority {
69 Priority::Low => EmailPriority::Low,
70 Priority::Normal => EmailPriority::Normal,
71 Priority::High => EmailPriority::High,
72 Priority::Critical => EmailPriority::Critical,
73 }
74 }
75}
76
77impl EmailJob {
78 pub fn new(email: Email) -> Self {
80 Self {
81 email,
82 priority: EmailPriority::Normal,
83 max_retries: 3,
84 timeout_seconds: 60,
85 provider: None,
86 metadata: HashMap::new(),
87 }
88 }
89
90 pub fn with_priority(mut self, priority: EmailPriority) -> Self {
92 self.priority = priority;
93 self
94 }
95
96 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
98 self.max_retries = max_retries;
99 self
100 }
101
102 pub fn with_timeout(mut self, timeout: Duration) -> Self {
104 self.timeout_seconds = timeout.as_secs();
105 self
106 }
107
108 pub fn with_provider(mut self, provider: String) -> Self {
110 self.provider = Some(provider);
111 self
112 }
113
114 pub fn with_metadata(mut self, key: String, value: String) -> Self {
116 self.metadata.insert(key, value);
117 self
118 }
119}
120
121#[async_trait]
122impl Job for EmailJob {
123 async fn execute(&self) -> JobResult<()> {
124 info!("Processing email job for: {}", self.email.id);
125
126 Err("Email job execution not yet implemented - requires EmailJobProcessor".into())
129 }
130
131 fn job_type(&self) -> &'static str {
132 "email"
133 }
134
135 fn max_retries(&self) -> u32 {
136 self.max_retries
137 }
138
139 fn retry_delay(&self, attempt: u32) -> Duration {
140 let base_delay = Duration::from_secs(1 << attempt.min(6)); let jitter = Duration::from_millis(rand::random::<u64>() % 1000);
143 base_delay + jitter
144 }
145
146 fn timeout(&self) -> Duration {
147 Duration::from_secs(self.timeout_seconds)
148 }
149}
150
151pub struct EmailJobProcessor {
153 default_provider: Arc<dyn EmailProvider>,
155 providers: RwLock<HashMap<String, Arc<dyn EmailProvider>>>,
157 delivery_tracking: RwLock<HashMap<Uuid, EmailDeliveryStatus>>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct EmailDeliveryStatus {
164 pub email_id: Uuid,
165 pub status: DeliveryState,
166 pub attempts: Vec<DeliveryAttempt>,
167 pub created_at: chrono::DateTime<chrono::Utc>,
168 pub updated_at: chrono::DateTime<chrono::Utc>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub enum DeliveryState {
173 Queued,
174 Processing,
175 Sent,
176 Failed,
177 Bounced,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct DeliveryAttempt {
182 pub attempt_number: u32,
183 pub attempted_at: chrono::DateTime<chrono::Utc>,
184 pub result: DeliveryAttemptResult,
185 pub provider_used: String,
186 pub error_message: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub enum DeliveryAttemptResult {
191 Success(EmailResult),
192 Failure(String),
193 Timeout,
194}
195
196impl EmailJobProcessor {
197 pub fn new(default_provider: Arc<dyn EmailProvider>) -> Self {
199 Self {
200 default_provider,
201 providers: RwLock::new(HashMap::new()),
202 delivery_tracking: RwLock::new(HashMap::new()),
203 }
204 }
205
206 pub async fn register_provider(&self, name: String, provider: Arc<dyn EmailProvider>) {
208 self.providers.write().await.insert(name, provider);
209 }
210
211 pub async fn process(&self, job: &EmailJob) -> JobResult<()> {
213 let start_time = chrono::Utc::now();
214 info!("Processing email job {} with priority {:?}", job.email.id, job.priority);
215
216 {
218 let mut tracking = self.delivery_tracking.write().await;
219 tracking.insert(job.email.id, EmailDeliveryStatus {
220 email_id: job.email.id,
221 status: DeliveryState::Processing,
222 attempts: Vec::new(),
223 created_at: start_time,
224 updated_at: start_time,
225 });
226 }
227
228 let provider = if let Some(ref provider_name) = job.provider {
230 self.providers.read().await
231 .get(provider_name)
232 .cloned()
233 .unwrap_or_else(|| {
234 warn!("Provider '{}' not found, using default", provider_name);
235 self.default_provider.clone()
236 })
237 } else {
238 self.default_provider.clone()
239 };
240
241 let result = tokio::time::timeout(
243 Duration::from_secs(job.timeout_seconds),
244 async {
245 let provider_clone = provider.clone();
247 let email_clone = job.email.clone();
248 match tokio::spawn(async move {
249 provider_clone.send(&email_clone).await
250 }).await {
251 Ok(result) => result,
252 Err(join_error) => {
253 if join_error.is_panic() {
254 Err(EmailError::provider("unknown", "Provider panicked during send"))
255 } else {
256 Err(EmailError::provider("unknown", "Provider task was cancelled"))
257 }
258 }
259 }
260 }
261 ).await;
262
263 let attempt_result = match result {
264 Ok(Ok(email_result)) => {
265 info!("Email {} sent successfully via {}", job.email.id, provider.provider_name());
266 DeliveryAttemptResult::Success(email_result)
267 }
268 Ok(Err(email_error)) => {
269 error!("Failed to send email {}: {}", job.email.id, email_error);
270 DeliveryAttemptResult::Failure(email_error.to_string())
271 }
272 Err(_) => {
273 error!("Email {} send timed out", job.email.id);
274 DeliveryAttemptResult::Timeout
275 }
276 };
277
278 {
280 let mut tracking = self.delivery_tracking.write().await;
281 if let Some(status) = tracking.get_mut(&job.email.id) {
282 let attempt = DeliveryAttempt {
283 attempt_number: status.attempts.len() as u32 + 1,
284 attempted_at: chrono::Utc::now(),
285 result: attempt_result.clone(),
286 provider_used: provider.provider_name().to_string(),
287 error_message: match &attempt_result {
288 DeliveryAttemptResult::Failure(msg) => Some(msg.clone()),
289 DeliveryAttemptResult::Timeout => Some("Request timed out".to_string()),
290 _ => None,
291 },
292 };
293
294 status.attempts.push(attempt);
295 status.updated_at = chrono::Utc::now();
296
297 status.status = match attempt_result {
298 DeliveryAttemptResult::Success(_) => DeliveryState::Sent,
299 _ => DeliveryState::Failed,
300 };
301 }
302 }
303
304 match attempt_result {
306 DeliveryAttemptResult::Success(_) => Ok(()),
307 DeliveryAttemptResult::Failure(msg) => Err(msg.into()),
308 DeliveryAttemptResult::Timeout => Err("Email send timed out".into()),
309 }
310 }
311
312 pub async fn get_delivery_status(&self, email_id: Uuid) -> Option<EmailDeliveryStatus> {
314 self.delivery_tracking.read().await.get(&email_id).cloned()
315 }
316
317 pub async fn get_all_delivery_statuses(&self) -> Vec<EmailDeliveryStatus> {
319 self.delivery_tracking.read().await.values().cloned().collect()
320 }
321
322 pub async fn cleanup_old_records(&self, older_than: Duration) {
324 let cutoff = chrono::Utc::now() - chrono::Duration::from_std(older_than).unwrap();
325 let mut tracking = self.delivery_tracking.write().await;
326 tracking.retain(|_, status| status.updated_at > cutoff);
327 }
328}
329
330pub struct EmailQueueService<B: QueueBackend> {
332 queue: Queue<B>,
333 processor: Option<Arc<EmailJobProcessor>>,
334}
335
336impl<B: QueueBackend> EmailQueueService<B> {
337 pub fn new(backend: B) -> Self {
339 Self {
340 queue: Queue::new(backend),
341 processor: None,
342 }
343 }
344
345 pub fn with_processor(mut self, processor: Arc<EmailJobProcessor>) -> Self {
347 self.processor = Some(processor);
348 self
349 }
350
351 pub async fn enqueue(&self, email: Email) -> QueueResult<JobId> {
353 let job = EmailJob::new(email);
354 self.enqueue_job(job).await
355 }
356
357 pub async fn enqueue_with_priority(&self, email: Email, priority: EmailPriority) -> QueueResult<JobId> {
359 let job = EmailJob::new(email).with_priority(priority);
360 self.enqueue_job(job).await
361 }
362
363 pub async fn enqueue_scheduled(&self, email: Email, send_at: chrono::DateTime<chrono::Utc>) -> QueueResult<JobId> {
365 let now = chrono::Utc::now();
366 let delay = if send_at > now {
367 match (send_at - now).to_std() {
368 Ok(duration) => {
369 let max_delay = Duration::from_secs(30 * 24 * 60 * 60);
371 if duration > max_delay {
372 return Err(QueueError::Configuration(
373 format!("Scheduled time is too far in the future (max 30 days): {:?}", duration)
374 ));
375 }
376 duration
377 }
378 Err(_) => {
379 return Err(QueueError::Configuration(
380 "Invalid scheduled time: duration cannot be converted to std::time::Duration".to_string()
381 ));
382 }
383 }
384 } else {
385 Duration::from_secs(0)
386 };
387
388 let job = EmailJob::new(email);
389 self.queue.enqueue_delayed(job, delay, Some(Priority::Normal)).await
390 }
391
392 pub async fn enqueue_batch(&self, emails: Vec<Email>) -> QueueResult<Vec<JobId>> {
394 let mut job_ids = Vec::new();
395 for email in emails {
396 let job_id = self.enqueue(email).await?;
397 job_ids.push(job_id);
398 }
399 Ok(job_ids)
400 }
401
402 async fn enqueue_job(&self, job: EmailJob) -> QueueResult<JobId> {
404 let priority = Priority::from(job.priority);
405 self.queue.enqueue(job, Some(priority)).await
406 }
407
408 pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
410 self.queue.get_job(job_id).await
411 }
412
413 pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
415 self.queue.get_jobs_by_state(state, limit).await
416 }
417
418 pub async fn stats(&self) -> QueueResult<EmailQueueStats> {
420 let queue_stats = self.queue.stats().await?;
421 Ok(EmailQueueStats {
422 pending: queue_stats.pending_jobs,
423 processing: queue_stats.processing_jobs,
424 completed: queue_stats.completed_jobs,
425 failed: queue_stats.failed_jobs,
426 dead_letter: queue_stats.dead_jobs,
427 total: queue_stats.total_jobs,
428 })
429 }
430
431 pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
433 self.queue.remove_job(job_id).await
434 }
435
436 pub async fn clear(&self) -> QueueResult<()> {
438 self.queue.clear().await
439 }
440
441 pub async fn process_next_job(&self) -> QueueResult<bool> {
443 if let Some(processor) = &self.processor {
444 if let Some(job_entry) = self.queue.dequeue().await? {
445 let job_id = job_entry.id();
446
447 let result = match serde_json::from_value::<EmailJob>(job_entry.payload().clone()) {
448 Ok(job) => processor.process(&job).await,
449 Err(e) => Err(e.into()),
450 };
451
452 self.queue.complete(job_id, result).await?;
454 Ok(true)
455 } else {
456 Ok(false) }
458 } else {
459 Err(QueueError::Configuration("No email processor configured".to_string()))
460 }
461 }
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct EmailQueueStats {
467 pub pending: u64,
468 pub processing: u64,
469 pub completed: u64,
470 pub failed: u64,
471 pub dead_letter: u64,
472 pub total: u64,
473}
474
475pub struct EmailWorker<B: QueueBackend> {
477 service: Arc<EmailQueueService<B>>,
478 shutdown_notify: Arc<tokio::sync::Notify>,
479 worker_id: String,
480 config: EmailWorkerConfig,
481}
482
483#[derive(Debug, Clone)]
485pub struct EmailWorkerConfig {
486 pub poll_interval: Duration,
488 pub batch_size: usize,
490 pub shutdown_timeout: Duration,
492 pub verbose: bool,
494}
495
496impl Default for EmailWorkerConfig {
497 fn default() -> Self {
498 Self {
499 poll_interval: Duration::from_secs(1),
500 batch_size: 10,
501 shutdown_timeout: Duration::from_secs(30),
502 verbose: false,
503 }
504 }
505}
506
507impl<B: QueueBackend + 'static> EmailWorker<B> {
508 pub fn new(service: Arc<EmailQueueService<B>>) -> Self {
510 Self {
511 service,
512 shutdown_notify: Arc::new(tokio::sync::Notify::new()),
513 worker_id: Uuid::new_v4().to_string(),
514 config: EmailWorkerConfig::default(),
515 }
516 }
517
518 pub fn with_config(mut self, config: EmailWorkerConfig) -> Self {
520 self.config = config;
521 self
522 }
523
524 pub async fn start(&self) -> tokio::task::JoinHandle<()> {
526 let service = self.service.clone();
527 let shutdown_notify = self.shutdown_notify.clone();
528 let worker_id = self.worker_id.clone();
529 let config = self.config.clone();
530
531 tokio::spawn(async move {
532 info!("Email worker {} started", worker_id);
533
534 loop {
535 let mut processed_count = 0;
536
537 for _ in 0..config.batch_size {
539 match service.process_next_job().await {
540 Ok(true) => processed_count += 1,
541 Ok(false) => break, Err(e) => {
543 error!("Error processing job in worker {}: {}", worker_id, e);
544 break;
545 }
546 }
547 }
548
549 if config.verbose && processed_count > 0 {
550 info!("Worker {} processed {} jobs", worker_id, processed_count);
551 }
552
553 tokio::select! {
555 _ = shutdown_notify.notified() => {
556 info!("Received shutdown signal for worker {}", worker_id);
557 break;
558 }
559 _ = tokio::time::sleep(config.poll_interval) => {
560 }
562 }
563 }
564
565 info!("Email worker {} stopped", worker_id);
566 })
567 }
568
569 pub fn stop(&self) {
571 info!("Stopping email worker {}", self.worker_id);
572 self.shutdown_notify.notify_one();
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use elif_queue::{MemoryBackend, QueueConfig};
580
581 #[test]
582 fn test_email_priority_conversion() {
583 assert_eq!(Priority::from(EmailPriority::Low), Priority::Low);
584 assert_eq!(Priority::from(EmailPriority::Normal), Priority::Normal);
585 assert_eq!(Priority::from(EmailPriority::High), Priority::High);
586 assert_eq!(Priority::from(EmailPriority::Critical), Priority::Critical);
587
588 assert_eq!(EmailPriority::from(Priority::Low), EmailPriority::Low);
589 assert_eq!(EmailPriority::from(Priority::Normal), EmailPriority::Normal);
590 assert_eq!(EmailPriority::from(Priority::High), EmailPriority::High);
591 assert_eq!(EmailPriority::from(Priority::Critical), EmailPriority::Critical);
592 }
593
594 #[test]
595 fn test_email_job_builder() {
596 let email = Email::new()
597 .from("test@example.com")
598 .to("user@example.com")
599 .subject("Test")
600 .text_body("Hello");
601
602 let job = EmailJob::new(email)
603 .with_priority(EmailPriority::High)
604 .with_max_retries(5)
605 .with_timeout(Duration::from_secs(120))
606 .with_provider("sendgrid".to_string())
607 .with_metadata("campaign".to_string(), "welcome".to_string());
608
609 assert_eq!(job.priority, EmailPriority::High);
610 assert_eq!(job.max_retries, 5);
611 assert_eq!(job.timeout_seconds, 120);
612 assert_eq!(job.provider.as_ref().unwrap(), "sendgrid");
613 assert_eq!(job.metadata.get("campaign").unwrap(), "welcome");
614 }
615
616 #[tokio::test]
617 async fn test_email_queue_service() {
618 let backend = MemoryBackend::new(QueueConfig::default());
619 let service = EmailQueueService::new(backend);
620
621 let email = Email::new()
622 .from("test@example.com")
623 .to("user@example.com")
624 .subject("Test")
625 .text_body("Hello");
626
627 let job_id = service.enqueue(email).await.unwrap();
628 assert!(!job_id.to_string().is_empty());
629
630 let stats = service.stats().await.unwrap();
631 assert_eq!(stats.total, 1);
632 assert_eq!(stats.pending, 1);
633 }
634
635 #[tokio::test]
636 async fn test_process_next_job_with_processor() {
637 use crate::providers::MockEmailProvider;
638
639 let backend = MemoryBackend::new(QueueConfig::default());
640 let provider = Arc::new(MockEmailProvider::new());
641 let processor = Arc::new(EmailJobProcessor::new(provider));
642 let service = EmailQueueService::new(backend).with_processor(processor);
643
644 let email = Email::new()
645 .from("test@example.com")
646 .to("user@example.com")
647 .subject("Test")
648 .text_body("Hello");
649
650 service.enqueue(email).await.unwrap();
651
652 let processed = service.process_next_job().await.unwrap();
654 assert!(processed);
655
656 let processed_again = service.process_next_job().await.unwrap();
658 assert!(!processed_again);
659 }
660
661 #[tokio::test]
662 async fn test_process_next_job_without_processor() {
663 let backend = MemoryBackend::new(QueueConfig::default());
664 let service = EmailQueueService::new(backend);
665
666 let email = Email::new()
667 .from("test@example.com")
668 .to("user@example.com")
669 .subject("Test")
670 .text_body("Hello");
671
672 service.enqueue(email).await.unwrap();
673
674 let result = service.process_next_job().await;
676 assert!(result.is_err());
677 assert!(result.unwrap_err().to_string().contains("No email processor configured"));
678 }
679
680 #[tokio::test]
681 async fn test_enqueue_scheduled_future_date() {
682 let backend = MemoryBackend::new(QueueConfig::default());
683 let service = EmailQueueService::new(backend);
684
685 let email = Email::new()
686 .from("test@example.com")
687 .to("user@example.com")
688 .subject("Scheduled Test")
689 .text_body("Hello");
690
691 let future_time = chrono::Utc::now() + chrono::Duration::hours(1);
692 let job_id = service.enqueue_scheduled(email, future_time).await.unwrap();
693 assert!(!job_id.to_string().is_empty());
694 }
695
696 #[tokio::test]
697 async fn test_enqueue_scheduled_too_far_future() {
698 let backend = MemoryBackend::new(QueueConfig::default());
699 let service = EmailQueueService::new(backend);
700
701 let email = Email::new()
702 .from("test@example.com")
703 .to("user@example.com")
704 .subject("Too Far Future Test")
705 .text_body("Hello");
706
707 let too_far_future = chrono::Utc::now() + chrono::Duration::days(31);
708 let result = service.enqueue_scheduled(email, too_far_future).await;
709 assert!(result.is_err());
710 assert!(result.unwrap_err().to_string().contains("too far in the future"));
711 }
712
713 #[tokio::test]
714 async fn test_enqueue_scheduled_past_date() {
715 let backend = MemoryBackend::new(QueueConfig::default());
716 let service = EmailQueueService::new(backend);
717
718 let email = Email::new()
719 .from("test@example.com")
720 .to("user@example.com")
721 .subject("Past Date Test")
722 .text_body("Hello");
723
724 let past_time = chrono::Utc::now() - chrono::Duration::hours(1);
725 let job_id = service.enqueue_scheduled(email, past_time).await.unwrap();
726 assert!(!job_id.to_string().is_empty());
727 }
728
729 #[tokio::test]
730 async fn test_email_worker_graceful_shutdown() {
731 use crate::providers::MockEmailProvider;
732
733 let backend = MemoryBackend::new(QueueConfig::default());
734 let provider = Arc::new(MockEmailProvider::new());
735 let processor = Arc::new(EmailJobProcessor::new(provider));
736 let service = Arc::new(EmailQueueService::new(backend).with_processor(processor));
737
738 let worker = EmailWorker::new(service.clone()).with_config(EmailWorkerConfig {
739 poll_interval: Duration::from_millis(100),
740 batch_size: 1,
741 shutdown_timeout: Duration::from_secs(5),
742 verbose: true,
743 });
744
745 let handle = worker.start().await;
746
747 tokio::time::sleep(Duration::from_millis(50)).await;
749
750 let start_time = std::time::Instant::now();
752 worker.stop();
753
754 tokio::time::timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
756
757 let shutdown_time = start_time.elapsed();
758 assert!(shutdown_time < Duration::from_millis(200),
760 "Shutdown took {:?}, expected < 200ms", shutdown_time);
761 }
762
763 #[tokio::test]
764 async fn test_email_job_processor_panic_safety() {
765 use crate::providers::PanickingEmailProvider;
766
767 let provider = Arc::new(PanickingEmailProvider::new());
768 let processor = EmailJobProcessor::new(provider);
769
770 let job = EmailJob::new(Email::new()
771 .from("test@example.com")
772 .to("user@example.com")
773 .subject("Panic Test")
774 .text_body("This should handle panics"));
775
776 let result = processor.process(&job).await;
778 assert!(result.is_err());
779 assert!(result.unwrap_err().to_string().contains("panicked"));
780 }
781}