elif_email/
queue.rs

1//! Email queue integration with elif-queue
2//!
3//! This module provides background email processing capabilities using the elif-queue system.
4
5use crate::{Email, EmailError, EmailProvider, EmailResult};
6use async_trait::async_trait;
7use elif_queue::{
8    Job, JobResult, JobEntry, JobId, Priority, Queue, QueueBackend, 
9    QueueResult, QueueError, JobState
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{info, warn, error};
17use uuid::Uuid;
18
19/// Email job for background processing
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct EmailJob {
22    /// The email to be sent
23    pub email: Email,
24    /// Job priority
25    pub priority: EmailPriority,
26    /// Maximum retry attempts
27    pub max_retries: u32,
28    /// Job timeout
29    pub timeout_seconds: u64,
30    /// Provider to use for sending (optional, uses default if None)
31    pub provider: Option<String>,
32    /// Additional metadata
33    pub metadata: HashMap<String, String>,
34}
35
36/// Email priority levels
37#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub enum EmailPriority {
39    /// Low priority (newsletters, bulk emails)
40    Low = 0,
41    /// Normal priority (user notifications)
42    Normal = 1,
43    /// High priority (password resets, confirmations)
44    High = 2,
45    /// Critical priority (security alerts, system notifications)
46    Critical = 3,
47}
48
49impl Default for EmailPriority {
50    fn default() -> Self {
51        EmailPriority::Normal
52    }
53}
54
55impl From<EmailPriority> for Priority {
56    fn from(email_priority: EmailPriority) -> Self {
57        match email_priority {
58            EmailPriority::Low => Priority::Low,
59            EmailPriority::Normal => Priority::Normal,
60            EmailPriority::High => Priority::High,
61            EmailPriority::Critical => Priority::Critical,
62        }
63    }
64}
65
66impl From<Priority> for EmailPriority {
67    fn from(priority: Priority) -> Self {
68        match priority {
69            Priority::Low => EmailPriority::Low,
70            Priority::Normal => EmailPriority::Normal,
71            Priority::High => EmailPriority::High,
72            Priority::Critical => EmailPriority::Critical,
73        }
74    }
75}
76
77impl EmailJob {
78    /// Create a new email job
79    pub fn new(email: Email) -> Self {
80        Self {
81            email,
82            priority: EmailPriority::Normal,
83            max_retries: 3,
84            timeout_seconds: 60,
85            provider: None,
86            metadata: HashMap::new(),
87        }
88    }
89    
90    /// Set job priority
91    pub fn with_priority(mut self, priority: EmailPriority) -> Self {
92        self.priority = priority;
93        self
94    }
95    
96    /// Set maximum retry attempts
97    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
98        self.max_retries = max_retries;
99        self
100    }
101    
102    /// Set job timeout
103    pub fn with_timeout(mut self, timeout: Duration) -> Self {
104        self.timeout_seconds = timeout.as_secs();
105        self
106    }
107    
108    /// Set specific provider to use
109    pub fn with_provider(mut self, provider: String) -> Self {
110        self.provider = Some(provider);
111        self
112    }
113    
114    /// Add metadata
115    pub fn with_metadata(mut self, key: String, value: String) -> Self {
116        self.metadata.insert(key, value);
117        self
118    }
119}
120
121#[async_trait]
122impl Job for EmailJob {
123    async fn execute(&self) -> JobResult<()> {
124        info!("Processing email job for: {}", self.email.id);
125        
126        // In a real implementation, this would get the email processor from context
127        // For now, we'll return a placeholder implementation
128        Err("Email job execution not yet implemented - requires EmailJobProcessor".into())
129    }
130    
131    fn job_type(&self) -> &'static str {
132        "email"
133    }
134    
135    fn max_retries(&self) -> u32 {
136        self.max_retries
137    }
138    
139    fn retry_delay(&self, attempt: u32) -> Duration {
140        // Exponential backoff with jitter
141        let base_delay = Duration::from_secs(1 << attempt.min(6)); // Cap at 64 seconds
142        let jitter = Duration::from_millis(rand::random::<u64>() % 1000);
143        base_delay + jitter
144    }
145    
146    fn timeout(&self) -> Duration {
147        Duration::from_secs(self.timeout_seconds)
148    }
149}
150
151/// Email job processor handles the execution of email jobs
152pub struct EmailJobProcessor {
153    /// Default email provider
154    default_provider: Arc<dyn EmailProvider>,
155    /// Named email providers
156    providers: RwLock<HashMap<String, Arc<dyn EmailProvider>>>,
157    /// Delivery tracking
158    delivery_tracking: RwLock<HashMap<Uuid, EmailDeliveryStatus>>,
159}
160
161/// Email delivery status tracking
162#[derive(Debug, Clone, Serialize, Deserialize)]
163pub struct EmailDeliveryStatus {
164    pub email_id: Uuid,
165    pub status: DeliveryState,
166    pub attempts: Vec<DeliveryAttempt>,
167    pub created_at: chrono::DateTime<chrono::Utc>,
168    pub updated_at: chrono::DateTime<chrono::Utc>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
172pub enum DeliveryState {
173    Queued,
174    Processing,
175    Sent,
176    Failed,
177    Bounced,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct DeliveryAttempt {
182    pub attempt_number: u32,
183    pub attempted_at: chrono::DateTime<chrono::Utc>,
184    pub result: DeliveryAttemptResult,
185    pub provider_used: String,
186    pub error_message: Option<String>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub enum DeliveryAttemptResult {
191    Success(EmailResult),
192    Failure(String),
193    Timeout,
194}
195
196impl EmailJobProcessor {
197    /// Create a new email job processor
198    pub fn new(default_provider: Arc<dyn EmailProvider>) -> Self {
199        Self {
200            default_provider,
201            providers: RwLock::new(HashMap::new()),
202            delivery_tracking: RwLock::new(HashMap::new()),
203        }
204    }
205    
206    /// Register a named email provider
207    pub async fn register_provider(&self, name: String, provider: Arc<dyn EmailProvider>) {
208        self.providers.write().await.insert(name, provider);
209    }
210    
211    /// Process an email job
212    pub async fn process(&self, job: &EmailJob) -> JobResult<()> {
213        let start_time = chrono::Utc::now();
214        info!("Processing email job {} with priority {:?}", job.email.id, job.priority);
215        
216        // Track delivery start
217        {
218            let mut tracking = self.delivery_tracking.write().await;
219            tracking.insert(job.email.id, EmailDeliveryStatus {
220                email_id: job.email.id,
221                status: DeliveryState::Processing,
222                attempts: Vec::new(),
223                created_at: start_time,
224                updated_at: start_time,
225            });
226        }
227        
228        // Select provider
229        let provider = if let Some(ref provider_name) = job.provider {
230            self.providers.read().await
231                .get(provider_name)
232                .cloned()
233                .unwrap_or_else(|| {
234                    warn!("Provider '{}' not found, using default", provider_name);
235                    self.default_provider.clone()
236                })
237        } else {
238            self.default_provider.clone()
239        };
240        
241        // Attempt to send email (with panic safety)  
242        let result = tokio::time::timeout(
243            Duration::from_secs(job.timeout_seconds),
244            async {
245                // Wrap the provider call in a task that can handle panics
246                let provider_clone = provider.clone();
247                let email_clone = job.email.clone();
248                match tokio::spawn(async move {
249                    provider_clone.send(&email_clone).await
250                }).await {
251                    Ok(result) => result,
252                    Err(join_error) => {
253                        if join_error.is_panic() {
254                            Err(EmailError::provider("unknown", "Provider panicked during send"))
255                        } else {
256                            Err(EmailError::provider("unknown", "Provider task was cancelled"))
257                        }
258                    }
259                }
260            }
261        ).await;
262        
263        let attempt_result = match result {
264            Ok(Ok(email_result)) => {
265                info!("Email {} sent successfully via {}", job.email.id, provider.provider_name());
266                DeliveryAttemptResult::Success(email_result)
267            }
268            Ok(Err(email_error)) => {
269                error!("Failed to send email {}: {}", job.email.id, email_error);
270                DeliveryAttemptResult::Failure(email_error.to_string())
271            }
272            Err(_) => {
273                error!("Email {} send timed out", job.email.id);
274                DeliveryAttemptResult::Timeout
275            }
276        };
277        
278        // Update delivery tracking
279        {
280            let mut tracking = self.delivery_tracking.write().await;
281            if let Some(status) = tracking.get_mut(&job.email.id) {
282                let attempt = DeliveryAttempt {
283                    attempt_number: status.attempts.len() as u32 + 1,
284                    attempted_at: chrono::Utc::now(),
285                    result: attempt_result.clone(),
286                    provider_used: provider.provider_name().to_string(),
287                    error_message: match &attempt_result {
288                        DeliveryAttemptResult::Failure(msg) => Some(msg.clone()),
289                        DeliveryAttemptResult::Timeout => Some("Request timed out".to_string()),
290                        _ => None,
291                    },
292                };
293                
294                status.attempts.push(attempt);
295                status.updated_at = chrono::Utc::now();
296                
297                status.status = match attempt_result {
298                    DeliveryAttemptResult::Success(_) => DeliveryState::Sent,
299                    _ => DeliveryState::Failed,
300                };
301            }
302        }
303        
304        // Return result
305        match attempt_result {
306            DeliveryAttemptResult::Success(_) => Ok(()),
307            DeliveryAttemptResult::Failure(msg) => Err(msg.into()),
308            DeliveryAttemptResult::Timeout => Err("Email send timed out".into()),
309        }
310    }
311    
312    /// Get delivery status for an email
313    pub async fn get_delivery_status(&self, email_id: Uuid) -> Option<EmailDeliveryStatus> {
314        self.delivery_tracking.read().await.get(&email_id).cloned()
315    }
316    
317    /// Get all delivery statuses
318    pub async fn get_all_delivery_statuses(&self) -> Vec<EmailDeliveryStatus> {
319        self.delivery_tracking.read().await.values().cloned().collect()
320    }
321    
322    /// Clear old delivery tracking records
323    pub async fn cleanup_old_records(&self, older_than: Duration) {
324        let cutoff = chrono::Utc::now() - chrono::Duration::from_std(older_than).unwrap();
325        let mut tracking = self.delivery_tracking.write().await;
326        tracking.retain(|_, status| status.updated_at > cutoff);
327    }
328}
329
330/// Email queue service - high-level interface for queuing emails
331pub struct EmailQueueService<B: QueueBackend> {
332    queue: Queue<B>,
333    processor: Option<Arc<EmailJobProcessor>>,
334}
335
336impl<B: QueueBackend> EmailQueueService<B> {
337    /// Create a new email queue service
338    pub fn new(backend: B) -> Self {
339        Self {
340            queue: Queue::new(backend),
341            processor: None,
342        }
343    }
344    
345    /// Set the email job processor
346    pub fn with_processor(mut self, processor: Arc<EmailJobProcessor>) -> Self {
347        self.processor = Some(processor);
348        self
349    }
350    
351    /// Queue an email for sending
352    pub async fn enqueue(&self, email: Email) -> QueueResult<JobId> {
353        let job = EmailJob::new(email);
354        self.enqueue_job(job).await
355    }
356    
357    /// Queue an email with specific priority
358    pub async fn enqueue_with_priority(&self, email: Email, priority: EmailPriority) -> QueueResult<JobId> {
359        let job = EmailJob::new(email).with_priority(priority);
360        self.enqueue_job(job).await
361    }
362    
363    /// Queue an email for later sending
364    pub async fn enqueue_scheduled(&self, email: Email, send_at: chrono::DateTime<chrono::Utc>) -> QueueResult<JobId> {
365        let now = chrono::Utc::now();
366        let delay = if send_at > now {
367            match (send_at - now).to_std() {
368                Ok(duration) => {
369                    // Cap at maximum reasonable delay (30 days)
370                    let max_delay = Duration::from_secs(30 * 24 * 60 * 60);
371                    if duration > max_delay {
372                        return Err(QueueError::Configuration(
373                            format!("Scheduled time is too far in the future (max 30 days): {:?}", duration)
374                        ));
375                    }
376                    duration
377                }
378                Err(_) => {
379                    return Err(QueueError::Configuration(
380                        "Invalid scheduled time: duration cannot be converted to std::time::Duration".to_string()
381                    ));
382                }
383            }
384        } else {
385            Duration::from_secs(0)
386        };
387        
388        let job = EmailJob::new(email);
389        self.queue.enqueue_delayed(job, delay, Some(Priority::Normal)).await
390    }
391    
392    /// Queue a batch of emails
393    pub async fn enqueue_batch(&self, emails: Vec<Email>) -> QueueResult<Vec<JobId>> {
394        let mut job_ids = Vec::new();
395        for email in emails {
396            let job_id = self.enqueue(email).await?;
397            job_ids.push(job_id);
398        }
399        Ok(job_ids)
400    }
401    
402    /// Internal method to queue an EmailJob
403    async fn enqueue_job(&self, job: EmailJob) -> QueueResult<JobId> {
404        let priority = Priority::from(job.priority);
405        self.queue.enqueue(job, Some(priority)).await
406    }
407    
408    /// Get job by ID
409    pub async fn get_job(&self, job_id: JobId) -> QueueResult<Option<JobEntry>> {
410        self.queue.get_job(job_id).await
411    }
412    
413    /// Get jobs by state
414    pub async fn get_jobs_by_state(&self, state: JobState, limit: Option<usize>) -> QueueResult<Vec<JobEntry>> {
415        self.queue.get_jobs_by_state(state, limit).await
416    }
417    
418    /// Get queue statistics
419    pub async fn stats(&self) -> QueueResult<EmailQueueStats> {
420        let queue_stats = self.queue.stats().await?;
421        Ok(EmailQueueStats {
422            pending: queue_stats.pending_jobs,
423            processing: queue_stats.processing_jobs,
424            completed: queue_stats.completed_jobs,
425            failed: queue_stats.failed_jobs,
426            dead_letter: queue_stats.dead_jobs,
427            total: queue_stats.total_jobs,
428        })
429    }
430    
431    /// Remove a job from the queue
432    pub async fn remove_job(&self, job_id: JobId) -> QueueResult<bool> {
433        self.queue.remove_job(job_id).await
434    }
435    
436    /// Clear all jobs
437    pub async fn clear(&self) -> QueueResult<()> {
438        self.queue.clear().await
439    }
440    
441    /// Process jobs (typically called by workers)
442    pub async fn process_next_job(&self) -> QueueResult<bool> {
443        if let Some(processor) = &self.processor {
444            if let Some(job_entry) = self.queue.dequeue().await? {
445                let job_id = job_entry.id();
446
447                let result = match serde_json::from_value::<EmailJob>(job_entry.payload().clone()) {
448                    Ok(job) => processor.process(&job).await,
449                    Err(e) => Err(e.into()),
450                };
451
452                // Complete the job in the queue
453                self.queue.complete(job_id, result).await?;
454                Ok(true)
455            } else {
456                Ok(false) // No jobs available
457            }
458        } else {
459            Err(QueueError::Configuration("No email processor configured".to_string()))
460        }
461    }
462}
463
464/// Email queue statistics
465#[derive(Debug, Clone, Serialize, Deserialize)]
466pub struct EmailQueueStats {
467    pub pending: u64,
468    pub processing: u64,
469    pub completed: u64,
470    pub failed: u64,
471    pub dead_letter: u64,
472    pub total: u64,
473}
474
475/// Email worker for background processing
476pub struct EmailWorker<B: QueueBackend> {
477    service: Arc<EmailQueueService<B>>,
478    shutdown_notify: Arc<tokio::sync::Notify>,
479    worker_id: String,
480    config: EmailWorkerConfig,
481}
482
483/// Email worker configuration
484#[derive(Debug, Clone)]
485pub struct EmailWorkerConfig {
486    /// How long to sleep between job checks
487    pub poll_interval: Duration,
488    /// Maximum number of jobs to process per batch
489    pub batch_size: usize,
490    /// Worker shutdown timeout
491    pub shutdown_timeout: Duration,
492    /// Enable verbose logging
493    pub verbose: bool,
494}
495
496impl Default for EmailWorkerConfig {
497    fn default() -> Self {
498        Self {
499            poll_interval: Duration::from_secs(1),
500            batch_size: 10,
501            shutdown_timeout: Duration::from_secs(30),
502            verbose: false,
503        }
504    }
505}
506
507impl<B: QueueBackend + 'static> EmailWorker<B> {
508    /// Create a new email worker
509    pub fn new(service: Arc<EmailQueueService<B>>) -> Self {
510        Self {
511            service,
512            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
513            worker_id: Uuid::new_v4().to_string(),
514            config: EmailWorkerConfig::default(),
515        }
516    }
517    
518    /// Create a worker with configuration
519    pub fn with_config(mut self, config: EmailWorkerConfig) -> Self {
520        self.config = config;
521        self
522    }
523    
524    /// Start the worker
525    pub async fn start(&self) -> tokio::task::JoinHandle<()> {
526        let service = self.service.clone();
527        let shutdown_notify = self.shutdown_notify.clone();
528        let worker_id = self.worker_id.clone();
529        let config = self.config.clone();
530        
531        tokio::spawn(async move {
532            info!("Email worker {} started", worker_id);
533            
534            loop {
535                let mut processed_count = 0;
536                
537                // Process up to batch_size jobs
538                for _ in 0..config.batch_size {
539                    match service.process_next_job().await {
540                        Ok(true) => processed_count += 1,
541                        Ok(false) => break, // No more jobs
542                        Err(e) => {
543                            error!("Error processing job in worker {}: {}", worker_id, e);
544                            break;
545                        }
546                    }
547                }
548                
549                if config.verbose && processed_count > 0 {
550                    info!("Worker {} processed {} jobs", worker_id, processed_count);
551                }
552                
553                // Wait for either shutdown signal or poll interval
554                tokio::select! {
555                    _ = shutdown_notify.notified() => {
556                        info!("Received shutdown signal for worker {}", worker_id);
557                        break;
558                    }
559                    _ = tokio::time::sleep(config.poll_interval) => {
560                        // Continue to next iteration
561                    }
562                }
563            }
564            
565            info!("Email worker {} stopped", worker_id);
566        })
567    }
568    
569    /// Stop the worker gracefully
570    pub fn stop(&self) {
571        info!("Stopping email worker {}", self.worker_id);
572        self.shutdown_notify.notify_one();
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use elif_queue::{MemoryBackend, QueueConfig};
580    
581    #[test]
582    fn test_email_priority_conversion() {
583        assert_eq!(Priority::from(EmailPriority::Low), Priority::Low);
584        assert_eq!(Priority::from(EmailPriority::Normal), Priority::Normal);
585        assert_eq!(Priority::from(EmailPriority::High), Priority::High);
586        assert_eq!(Priority::from(EmailPriority::Critical), Priority::Critical);
587        
588        assert_eq!(EmailPriority::from(Priority::Low), EmailPriority::Low);
589        assert_eq!(EmailPriority::from(Priority::Normal), EmailPriority::Normal);
590        assert_eq!(EmailPriority::from(Priority::High), EmailPriority::High);
591        assert_eq!(EmailPriority::from(Priority::Critical), EmailPriority::Critical);
592    }
593    
594    #[test]
595    fn test_email_job_builder() {
596        let email = Email::new()
597            .from("test@example.com")
598            .to("user@example.com")
599            .subject("Test")
600            .text_body("Hello");
601            
602        let job = EmailJob::new(email)
603            .with_priority(EmailPriority::High)
604            .with_max_retries(5)
605            .with_timeout(Duration::from_secs(120))
606            .with_provider("sendgrid".to_string())
607            .with_metadata("campaign".to_string(), "welcome".to_string());
608            
609        assert_eq!(job.priority, EmailPriority::High);
610        assert_eq!(job.max_retries, 5);
611        assert_eq!(job.timeout_seconds, 120);
612        assert_eq!(job.provider.as_ref().unwrap(), "sendgrid");
613        assert_eq!(job.metadata.get("campaign").unwrap(), "welcome");
614    }
615    
616    #[tokio::test]
617    async fn test_email_queue_service() {
618        let backend = MemoryBackend::new(QueueConfig::default());
619        let service = EmailQueueService::new(backend);
620        
621        let email = Email::new()
622            .from("test@example.com")
623            .to("user@example.com")
624            .subject("Test")
625            .text_body("Hello");
626        
627        let job_id = service.enqueue(email).await.unwrap();
628        assert!(!job_id.to_string().is_empty());
629        
630        let stats = service.stats().await.unwrap();
631        assert_eq!(stats.total, 1);
632        assert_eq!(stats.pending, 1);
633    }
634    
635    #[tokio::test]
636    async fn test_process_next_job_with_processor() {
637        use crate::providers::MockEmailProvider;
638        
639        let backend = MemoryBackend::new(QueueConfig::default());
640        let provider = Arc::new(MockEmailProvider::new());
641        let processor = Arc::new(EmailJobProcessor::new(provider));
642        let service = EmailQueueService::new(backend).with_processor(processor);
643        
644        let email = Email::new()
645            .from("test@example.com")
646            .to("user@example.com")
647            .subject("Test")
648            .text_body("Hello");
649        
650        service.enqueue(email).await.unwrap();
651        
652        // Process job should work with processor configured
653        let processed = service.process_next_job().await.unwrap();
654        assert!(processed);
655        
656        // No more jobs to process
657        let processed_again = service.process_next_job().await.unwrap();
658        assert!(!processed_again);
659    }
660    
661    #[tokio::test]
662    async fn test_process_next_job_without_processor() {
663        let backend = MemoryBackend::new(QueueConfig::default());
664        let service = EmailQueueService::new(backend);
665        
666        let email = Email::new()
667            .from("test@example.com")
668            .to("user@example.com")
669            .subject("Test")
670            .text_body("Hello");
671        
672        service.enqueue(email).await.unwrap();
673        
674        // Should fail without processor
675        let result = service.process_next_job().await;
676        assert!(result.is_err());
677        assert!(result.unwrap_err().to_string().contains("No email processor configured"));
678    }
679    
680    #[tokio::test]
681    async fn test_enqueue_scheduled_future_date() {
682        let backend = MemoryBackend::new(QueueConfig::default());
683        let service = EmailQueueService::new(backend);
684        
685        let email = Email::new()
686            .from("test@example.com")
687            .to("user@example.com")
688            .subject("Scheduled Test")
689            .text_body("Hello");
690        
691        let future_time = chrono::Utc::now() + chrono::Duration::hours(1);
692        let job_id = service.enqueue_scheduled(email, future_time).await.unwrap();
693        assert!(!job_id.to_string().is_empty());
694    }
695    
696    #[tokio::test]
697    async fn test_enqueue_scheduled_too_far_future() {
698        let backend = MemoryBackend::new(QueueConfig::default());
699        let service = EmailQueueService::new(backend);
700        
701        let email = Email::new()
702            .from("test@example.com")
703            .to("user@example.com")
704            .subject("Too Far Future Test")
705            .text_body("Hello");
706        
707        let too_far_future = chrono::Utc::now() + chrono::Duration::days(31);
708        let result = service.enqueue_scheduled(email, too_far_future).await;
709        assert!(result.is_err());
710        assert!(result.unwrap_err().to_string().contains("too far in the future"));
711    }
712    
713    #[tokio::test]
714    async fn test_enqueue_scheduled_past_date() {
715        let backend = MemoryBackend::new(QueueConfig::default());
716        let service = EmailQueueService::new(backend);
717        
718        let email = Email::new()
719            .from("test@example.com")
720            .to("user@example.com")
721            .subject("Past Date Test")
722            .text_body("Hello");
723        
724        let past_time = chrono::Utc::now() - chrono::Duration::hours(1);
725        let job_id = service.enqueue_scheduled(email, past_time).await.unwrap();
726        assert!(!job_id.to_string().is_empty());
727    }
728    
729    #[tokio::test]
730    async fn test_email_worker_graceful_shutdown() {
731        use crate::providers::MockEmailProvider;
732        
733        let backend = MemoryBackend::new(QueueConfig::default());
734        let provider = Arc::new(MockEmailProvider::new());
735        let processor = Arc::new(EmailJobProcessor::new(provider));
736        let service = Arc::new(EmailQueueService::new(backend).with_processor(processor));
737        
738        let worker = EmailWorker::new(service.clone()).with_config(EmailWorkerConfig {
739            poll_interval: Duration::from_millis(100),
740            batch_size: 1,
741            shutdown_timeout: Duration::from_secs(5),
742            verbose: true,
743        });
744        
745        let handle = worker.start().await;
746        
747        // Give worker time to start
748        tokio::time::sleep(Duration::from_millis(50)).await;
749        
750        // Stop worker - should be immediate with new graceful shutdown
751        let start_time = std::time::Instant::now();
752        worker.stop();
753        
754        // Wait for worker to finish
755        tokio::time::timeout(Duration::from_secs(2), handle).await.unwrap().unwrap();
756        
757        let shutdown_time = start_time.elapsed();
758        // Should shutdown much faster than poll_interval with graceful shutdown
759        assert!(shutdown_time < Duration::from_millis(200), 
760                "Shutdown took {:?}, expected < 200ms", shutdown_time);
761    }
762    
763    #[tokio::test] 
764    async fn test_email_job_processor_panic_safety() {
765        use crate::providers::PanickingEmailProvider;
766        
767        let provider = Arc::new(PanickingEmailProvider::new());
768        let processor = EmailJobProcessor::new(provider);
769        
770        let job = EmailJob::new(Email::new()
771            .from("test@example.com")
772            .to("user@example.com")
773            .subject("Panic Test")
774            .text_body("This should handle panics"));
775        
776        // Should handle panic gracefully and return error
777        let result = processor.process(&job).await;
778        assert!(result.is_err());
779        assert!(result.unwrap_err().to_string().contains("panicked"));
780    }
781}