elif_queue/
scheduler.rs

1//! Job scheduling system with cron expression support
2//!
3//! This module provides advanced job scheduling capabilities including:
4//! - Cron expression parsing and validation
5//! - Recurring job scheduling
6//! - Delayed execution with retry logic
7//! - Multiple retry strategies with backoff patterns
8
9use std::time::Duration;
10use std::str::FromStr;
11use std::sync::Arc;
12use chrono::{DateTime, Utc};
13use cron::Schedule;
14use serde::{Serialize, Deserialize};
15use thiserror::Error;
16use crate::{Job, JobEntry, Priority, QueueResult, QueueError};
17
18/// Scheduling errors
19#[derive(Error, Debug)]
20pub enum ScheduleError {
21    #[error("Invalid cron expression: {0}")]
22    InvalidCron(String),
23    
24    #[error("Schedule not found: {0}")]
25    ScheduleNotFound(String),
26    
27    #[error("Invalid retry configuration: {0}")]
28    InvalidRetryConfig(String),
29    
30    #[error("Queue error: {0}")]
31    Queue(#[from] QueueError),
32}
33
34/// Result type for scheduling operations
35pub type ScheduleResult<T> = Result<T, ScheduleError>;
36
37/// Cron expression wrapper with validation
38#[derive(Debug, Clone, Serialize)]
39pub struct CronExpression {
40    expression: String,
41    #[serde(skip)]
42    schedule: Option<Schedule>,
43}
44
45impl CronExpression {
46    /// Create a new cron expression from a string
47    /// 
48    /// Supports standard 6-field cron format:
49    /// - `0 * * * * *` (every minute)
50    /// - `0 0 0 * * *` (daily at midnight)
51    /// - `0 0 */6 * * *` (every 6 hours)
52    /// - `0 0 9-17 * * 1-5` (weekdays 9-5)
53    pub fn new(expression: &str) -> ScheduleResult<Self> {
54        let schedule = Schedule::from_str(expression)
55            .map_err(|e| ScheduleError::InvalidCron(format!("{}: {}", expression, e)))?;
56            
57        Ok(CronExpression {
58            expression: expression.to_string(),
59            schedule: Some(schedule),
60        })
61    }
62    
63    /// Get the raw cron expression string
64    pub fn expression(&self) -> &str {
65        &self.expression
66    }
67    
68    /// Get the next run time after the given datetime
69    pub fn next_run_time(&self, after: DateTime<Utc>) -> Option<DateTime<Utc>> {
70        self.schedule.as_ref()?.after(&after).next()
71    }
72    
73    /// Get the next N run times after the given datetime
74    pub fn next_run_times(&self, after: DateTime<Utc>, count: usize) -> Vec<DateTime<Utc>> {
75        self.schedule.as_ref()
76            .map(|s| s.after(&after).take(count).collect())
77            .unwrap_or_default()
78    }
79    
80    /// Check if this schedule should run at the given time
81    pub fn should_run(&self, at: DateTime<Utc>) -> bool {
82        if let Some(next) = self.next_run_time(at - chrono::Duration::seconds(1)) {
83            (next - at).num_seconds().abs() < 30 // 30 second window
84        } else {
85            false
86        }
87    }
88}
89
90// Custom serialization to handle the non-serializable Schedule field
91impl<'de> Deserialize<'de> for CronExpression {
92    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
93    where
94        D: serde::Deserializer<'de>,
95    {
96        #[derive(Deserialize)]
97        struct CronExpressionData {
98            expression: String,
99        }
100        
101        let data = CronExpressionData::deserialize(deserializer)?;
102        CronExpression::new(&data.expression)
103            .map_err(|e| serde::de::Error::custom(e.to_string()))
104    }
105}
106
107/// Retry strategy configuration
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub enum RetryStrategy {
110    /// Fixed delay between retries
111    Fixed {
112        delay: Duration,
113        max_attempts: u32,
114    },
115    /// Exponential backoff with optional jitter
116    Exponential {
117        initial_delay: Duration,
118        multiplier: f64,
119        max_delay: Duration,
120        max_attempts: u32,
121        jitter: bool,
122    },
123    /// Linear backoff
124    Linear {
125        initial_delay: Duration,
126        increment: Duration,
127        max_delay: Duration,
128        max_attempts: u32,
129    },
130    /// Custom retry delays
131    Custom {
132        delays: Vec<Duration>,
133    },
134}
135
136impl Default for RetryStrategy {
137    fn default() -> Self {
138        RetryStrategy::Exponential {
139            initial_delay: Duration::from_secs(1),
140            multiplier: 2.0,
141            max_delay: Duration::from_secs(300), // 5 minutes
142            max_attempts: 3,
143            jitter: true,
144        }
145    }
146}
147
148impl RetryStrategy {
149    /// Calculate retry delay for the given attempt
150    pub fn delay_for_attempt(&self, attempt: u32) -> Option<Duration> {
151        use rand::Rng;
152        
153        match self {
154            RetryStrategy::Fixed { delay, max_attempts } => {
155                if attempt < *max_attempts {
156                    Some(*delay)
157                } else {
158                    None
159                }
160            }
161            RetryStrategy::Exponential {
162                initial_delay,
163                multiplier,
164                max_delay,
165                max_attempts,
166                jitter,
167            } => {
168                if attempt >= *max_attempts {
169                    return None;
170                }
171                
172                let delay = initial_delay.as_secs_f64() * multiplier.powi(attempt as i32);
173                let delay = delay.min(max_delay.as_secs_f64());
174                
175                let delay = if *jitter {
176                    // Add ±25% jitter
177                    let mut rng = rand::thread_rng();
178                    let jitter_factor = rng.gen_range(0.75..1.25);
179                    delay * jitter_factor
180                } else {
181                    delay
182                };
183                
184                Some(Duration::from_secs_f64(delay))
185            }
186            RetryStrategy::Linear {
187                initial_delay,
188                increment,
189                max_delay,
190                max_attempts,
191            } => {
192                if attempt >= *max_attempts {
193                    return None;
194                }
195                
196                let delay = initial_delay.as_secs() + (increment.as_secs() * attempt as u64);
197                let delay = delay.min(max_delay.as_secs());
198                Some(Duration::from_secs(delay))
199            }
200            RetryStrategy::Custom { delays } => {
201                delays.get(attempt as usize).copied()
202            }
203        }
204    }
205    
206    /// Get maximum number of retry attempts
207    pub fn max_attempts(&self) -> u32 {
208        match self {
209            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
210            RetryStrategy::Exponential { max_attempts, .. } => *max_attempts,
211            RetryStrategy::Linear { max_attempts, .. } => *max_attempts,
212            RetryStrategy::Custom { delays } => delays.len() as u32,
213        }
214    }
215}
216
217/// Scheduled job configuration
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ScheduledJob {
220    /// Unique identifier for this scheduled job
221    pub id: String,
222    /// Cron expression for when to run
223    pub cron: CronExpression,
224    /// Job type to execute
225    pub job_type: String,
226    /// Job payload (serialized)
227    pub payload: serde_json::Value,
228    /// Job priority
229    pub priority: Priority,
230    /// Retry strategy configuration
231    pub retry_strategy: RetryStrategy,
232    /// Job timeout
233    pub timeout: Duration,
234    /// Whether this schedule is active
235    pub enabled: bool,
236    /// Optional description
237    pub description: Option<String>,
238    /// Next scheduled run time
239    pub next_run: Option<DateTime<Utc>>,
240    /// Last execution time
241    pub last_run: Option<DateTime<Utc>>,
242    /// When this schedule was created
243    pub created_at: DateTime<Utc>,
244}
245
246impl ScheduledJob {
247    /// Create a new scheduled job
248    pub fn new<T: Job>(
249        id: String,
250        cron_expr: &str,
251        job: T,
252        priority: Option<Priority>,
253        retry_strategy: Option<RetryStrategy>,
254    ) -> ScheduleResult<Self> {
255        let cron = CronExpression::new(cron_expr)?;
256        let now = Utc::now();
257        let next_run = cron.next_run_time(now);
258        
259        let job_type = job.job_type().to_string();
260        let timeout = job.timeout();
261        let payload = serde_json::to_value(job)
262            .map_err(|e| ScheduleError::Queue(QueueError::Serialization(e)))?;
263        
264        Ok(ScheduledJob {
265            id,
266            cron,
267            job_type,
268            payload,
269            priority: priority.unwrap_or_default(),
270            retry_strategy: retry_strategy.unwrap_or_default(),
271            timeout,
272            enabled: true,
273            description: None,
274            next_run,
275            last_run: None,
276            created_at: now,
277        })
278    }
279    
280    /// Update next run time
281    pub fn update_next_run(&mut self) {
282        let after = self.last_run.unwrap_or_else(Utc::now);
283        self.next_run = self.cron.next_run_time(after);
284    }
285    
286    /// Check if this job should run now
287    pub fn should_run(&self) -> bool {
288        if !self.enabled {
289            return false;
290        }
291        
292        if let Some(next_run) = self.next_run {
293            next_run <= Utc::now()
294        } else {
295            false
296        }
297    }
298    
299    /// Mark as executed
300    pub fn mark_executed(&mut self) {
301        self.last_run = Some(Utc::now());
302        self.update_next_run();
303    }
304    
305    /// Create a job entry for execution
306    pub fn create_job_entry(&self) -> QueueResult<JobEntry> {
307        JobEntry::new_with_job_type(
308            self.job_type.clone(),
309            self.payload.clone(),
310            Some(self.priority),
311            None, // No delay - execute immediately
312            self.retry_strategy.max_attempts(),
313        )
314    }
315}
316
317/// Wrapper to implement Job trait for scheduled jobs
318#[derive(Debug, Clone, Serialize, Deserialize)]
319struct ScheduledJobWrapper {
320    job_type: String,
321    payload: serde_json::Value,
322    max_retries: u32,
323    timeout: Duration,
324}
325
326#[async_trait::async_trait]
327impl Job for ScheduledJobWrapper {
328    async fn execute(&self) -> crate::JobResult<()> {
329        // This should not be called directly - it's just for creating JobEntry
330        Ok(())
331    }
332    
333    fn job_type(&self) -> &'static str {
334        // This is a bit of a hack - we need to return a &'static str
335        // In practice, the job_type will be used from the JobEntry
336        "scheduled_job_wrapper"
337    }
338    
339    fn max_retries(&self) -> u32 {
340        self.max_retries
341    }
342    
343    fn timeout(&self) -> Duration {
344        self.timeout
345    }
346}
347
348/// Job scheduler manages recurring jobs and their execution
349pub struct JobScheduler<B: crate::QueueBackend> {
350    backend: std::sync::Arc<B>,
351    schedules: std::sync::Arc<parking_lot::RwLock<std::collections::HashMap<String, ScheduledJob>>>,
352    running: std::sync::Arc<std::sync::atomic::AtomicBool>,
353}
354
355impl<B: crate::QueueBackend + 'static> JobScheduler<B> {
356    /// Create a new job scheduler
357    pub fn new(backend: std::sync::Arc<B>) -> Self {
358        Self {
359            backend,
360            schedules: std::sync::Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
361            running: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
362        }
363    }
364    
365    /// Add a scheduled job
366    pub fn add_schedule(&self, schedule: ScheduledJob) -> ScheduleResult<()> {
367        let mut schedules = self.schedules.write();
368        schedules.insert(schedule.id.clone(), schedule);
369        Ok(())
370    }
371    
372    /// Remove a scheduled job
373    pub fn remove_schedule(&self, id: &str) -> ScheduleResult<bool> {
374        let mut schedules = self.schedules.write();
375        Ok(schedules.remove(id).is_some())
376    }
377    
378    /// Get a scheduled job by ID
379    pub fn get_schedule(&self, id: &str) -> Option<ScheduledJob> {
380        let schedules = self.schedules.read();
381        schedules.get(id).cloned()
382    }
383    
384    /// List all scheduled jobs
385    pub fn list_schedules(&self) -> Vec<ScheduledJob> {
386        let schedules = self.schedules.read();
387        schedules.values().cloned().collect()
388    }
389    
390    /// Enable or disable a scheduled job
391    pub fn set_schedule_enabled(&self, id: &str, enabled: bool) -> ScheduleResult<bool> {
392        let mut schedules = self.schedules.write();
393        if let Some(schedule) = schedules.get_mut(id) {
394            schedule.enabled = enabled;
395            Ok(true)
396        } else {
397            Ok(false)
398        }
399    }
400    
401    /// Start the scheduler loop
402    pub async fn start(&self) -> ScheduleResult<()> {
403        self.running.store(true, std::sync::atomic::Ordering::SeqCst);
404        
405        let backend = self.backend.clone();
406        let schedules = self.schedules.clone();
407        let running = self.running.clone();
408        
409        tokio::spawn(async move {
410            let mut interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
411            
412            while running.load(std::sync::atomic::Ordering::SeqCst) {
413                interval.tick().await;
414                
415                // Get schedules that should run
416                let mut due_schedules = Vec::new();
417                {
418                    let mut schedules_guard = schedules.write();
419                    for schedule in schedules_guard.values_mut() {
420                        if schedule.should_run() {
421                            schedule.mark_executed();
422                            due_schedules.push(schedule.clone());
423                        }
424                    }
425                }
426                
427                // Enqueue jobs for due schedules
428                for schedule in due_schedules {
429                    if let Ok(job_entry) = schedule.create_job_entry() {
430                        if let Err(e) = backend.enqueue(job_entry).await {
431                            tracing::error!("Failed to enqueue scheduled job {}: {}", schedule.id, e);
432                        } else {
433                            tracing::info!("Enqueued scheduled job: {}", schedule.id);
434                        }
435                    }
436                }
437            }
438        });
439        
440        Ok(())
441    }
442    
443    /// Stop the scheduler
444    pub fn stop(&self) {
445        self.running.store(false, std::sync::atomic::Ordering::SeqCst);
446    }
447    
448    /// Check if scheduler is running
449    pub fn is_running(&self) -> bool {
450        self.running.load(std::sync::atomic::Ordering::SeqCst)
451    }
452    
453    /// Get dead letter queue entries (jobs that failed permanently)
454    pub async fn get_dead_jobs(&self, limit: Option<usize>) -> QueueResult<Vec<crate::JobEntry>> {
455        self.backend.get_jobs_by_state(crate::JobState::Dead, limit).await
456    }
457    
458    /// Requeue a dead job (reset attempts and change to Pending state)
459    pub async fn requeue_dead_job(&self, job_id: crate::JobId) -> QueueResult<bool> {
460        if let Some(job) = self.backend.get_job(job_id).await? {
461            if job.state() == &crate::JobState::Dead {
462                // Use atomic requeue operation
463                self.backend.requeue_job(job_id, job).await
464            } else {
465                Ok(false)
466            }
467        } else {
468            Ok(false)
469        }
470    }
471    
472    /// Clear all dead letter queue entries
473    pub async fn clear_dead_jobs(&self) -> QueueResult<u64> {
474        // Use atomic clear operation
475        self.backend.clear_jobs_by_state(crate::JobState::Dead).await
476    }
477}
478
479
480/// Helper functions for creating common cron expressions
481pub mod cron_presets {
482    use super::CronExpression;
483    
484    /// Every minute
485    pub fn every_minute() -> CronExpression {
486        CronExpression::new("0 * * * * *").expect("Invalid 'every_minute' cron preset")
487    }
488    
489    /// Every 5 minutes
490    pub fn every_5_minutes() -> CronExpression {
491        CronExpression::new("0 */5 * * * *").unwrap()
492    }
493    
494    /// Every 15 minutes
495    pub fn every_15_minutes() -> CronExpression {
496        CronExpression::new("0 */15 * * * *").unwrap()
497    }
498    
499    /// Every 30 minutes
500    pub fn every_30_minutes() -> CronExpression {
501        CronExpression::new("0 */30 * * * *").unwrap()
502    }
503    
504    /// Every hour at minute 0
505    pub fn hourly() -> CronExpression {
506        CronExpression::new("0 0 * * * *").unwrap()
507    }
508    
509    /// Daily at midnight
510    pub fn daily() -> CronExpression {
511        CronExpression::new("0 0 0 * * *").unwrap()
512    }
513    
514    /// Weekly on Sunday at midnight
515    pub fn weekly() -> CronExpression {
516        CronExpression::new("0 0 0 * * SUN").unwrap()
517    }
518    
519    /// Monthly on the 1st at midnight
520    pub fn monthly() -> CronExpression {
521        CronExpression::new("0 0 0 1 * *").unwrap()
522    }
523    
524    /// Weekdays at 9 AM
525    pub fn weekdays_at_9am() -> CronExpression {
526        CronExpression::new("0 0 9 * * 1-5").unwrap()
527    }
528    
529    /// Custom cron expression
530    pub fn custom(expression: &str) -> Result<CronExpression, super::ScheduleError> {
531        CronExpression::new(expression)
532    }
533}
534
535/// Cancellation token for cooperative job cancellation
536#[derive(Debug, Clone)]
537pub struct CancellationToken {
538    cancelled: Arc<std::sync::atomic::AtomicBool>,
539    notify: Arc<tokio::sync::Notify>,
540}
541
542impl CancellationToken {
543    /// Create a new cancellation token
544    pub fn new() -> Self {
545        Self {
546            cancelled: Arc::new(std::sync::atomic::AtomicBool::new(false)),
547            notify: Arc::new(tokio::sync::Notify::new()),
548        }
549    }
550    
551    /// Cancel the operation
552    pub fn cancel(&self) {
553        self.cancelled.store(true, std::sync::atomic::Ordering::SeqCst);
554        self.notify.notify_waiters();
555    }
556    
557    /// Check if cancellation was requested
558    pub fn is_cancelled(&self) -> bool {
559        self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
560    }
561    
562    /// Wait for cancellation signal efficiently using async notification
563    pub async fn wait_for_cancellation(&self) {
564        if self.is_cancelled() {
565            return;
566        }
567        
568        self.notify.notified().await;
569    }
570    
571    /// Create a future that completes when cancelled
572    pub async fn cancelled(&self) {
573        self.wait_for_cancellation().await;
574    }
575}
576
577impl Default for CancellationToken {
578    fn default() -> Self {
579        Self::new()
580    }
581}
582
583/// Job cancellation manager
584#[derive(Debug)]
585pub struct JobCancellationManager {
586    active_tokens: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, CancellationToken>>>,
587}
588
589impl JobCancellationManager {
590    /// Create a new cancellation manager
591    pub fn new() -> Self {
592        Self {
593            active_tokens: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
594        }
595    }
596    
597    /// Register a new job with cancellation token
598    pub fn register_job(&self, job_id: crate::JobId) -> CancellationToken {
599        let token = CancellationToken::new();
600        self.active_tokens.write().insert(job_id, token.clone());
601        token
602    }
603    
604    /// Cancel a specific job
605    pub fn cancel_job(&self, job_id: crate::JobId) -> bool {
606        if let Some(token) = self.active_tokens.read().get(&job_id) {
607            token.cancel();
608            true
609        } else {
610            false
611        }
612    }
613    
614    /// Cancel all active jobs
615    pub fn cancel_all(&self) {
616        let tokens = self.active_tokens.read();
617        for token in tokens.values() {
618            token.cancel();
619        }
620    }
621    
622    /// Remove completed job from tracking
623    pub fn unregister_job(&self, job_id: crate::JobId) {
624        self.active_tokens.write().remove(&job_id);
625    }
626    
627    /// Get active job count
628    pub fn active_job_count(&self) -> usize {
629        self.active_tokens.read().len()
630    }
631    
632    /// List all active job IDs
633    pub fn active_jobs(&self) -> Vec<crate::JobId> {
634        self.active_tokens.read().keys().cloned().collect()
635    }
636}
637
638impl Default for JobCancellationManager {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644/// Extended job trait with cancellation support
645#[async_trait::async_trait]
646pub trait CancellableJob: Job {
647    /// Execute the job with cancellation support
648    async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()>;
649}
650
651/// Job execution metrics and statistics
652#[derive(Debug, Clone, Default, Serialize, Deserialize)]
653pub struct JobMetrics {
654    /// Total number of jobs scheduled
655    pub total_scheduled: u64,
656    /// Total number of jobs executed
657    pub total_executed: u64,
658    /// Total number of successful jobs
659    pub successful_jobs: u64,
660    /// Total number of failed jobs
661    pub failed_jobs: u64,
662    /// Total number of retried jobs
663    pub retried_jobs: u64,
664    /// Total number of timed out jobs
665    pub timeout_jobs: u64,
666    /// Total number of cancelled jobs
667    pub cancelled_jobs: u64,
668    /// Average execution time in milliseconds
669    pub avg_execution_time_ms: f64,
670    /// Min execution time in milliseconds
671    pub min_execution_time_ms: u64,
672    /// Max execution time in milliseconds
673    pub max_execution_time_ms: u64,
674    /// Jobs by priority
675    pub jobs_by_priority: std::collections::HashMap<String, u64>,
676    /// Jobs by type
677    pub jobs_by_type: std::collections::HashMap<String, u64>,
678    /// Success rate (0.0 - 1.0)
679    pub success_rate: f64,
680    /// Average retry attempts for failed jobs
681    pub avg_retry_attempts: f64,
682    /// Last reset timestamp
683    pub last_reset: DateTime<Utc>,
684}
685
686impl JobMetrics {
687    /// Create new empty metrics
688    pub fn new() -> Self {
689        Self {
690            last_reset: Utc::now(),
691            ..Default::default()
692        }
693    }
694    
695    /// Record a scheduled job
696    pub fn record_scheduled(&mut self, job_type: &str, priority: Priority) {
697        self.total_scheduled += 1;
698        *self.jobs_by_type.entry(job_type.to_string()).or_insert(0) += 1;
699        *self.jobs_by_priority.entry(format!("{:?}", priority)).or_insert(0) += 1;
700    }
701    
702    /// Record job execution start
703    pub fn record_execution_start(&mut self) {
704        self.total_executed += 1;
705    }
706    
707    /// Record successful job completion
708    pub fn record_success(&mut self, execution_time_ms: u64) {
709        self.successful_jobs += 1;
710        self.update_execution_time(execution_time_ms);
711        self.update_success_rate();
712    }
713    
714    /// Record job failure
715    pub fn record_failure(&mut self, execution_time_ms: u64, retry_attempts: u32) {
716        self.failed_jobs += 1;
717        self.update_execution_time(execution_time_ms);
718        self.update_success_rate();
719        self.update_retry_attempts(retry_attempts);
720    }
721    
722    /// Record job retry
723    pub fn record_retry(&mut self) {
724        self.retried_jobs += 1;
725    }
726    
727    /// Record job timeout
728    pub fn record_timeout(&mut self, execution_time_ms: u64) {
729        self.timeout_jobs += 1;
730        self.update_execution_time(execution_time_ms);
731    }
732    
733    /// Record job cancellation
734    pub fn record_cancellation(&mut self, execution_time_ms: u64) {
735        self.cancelled_jobs += 1;
736        self.update_execution_time(execution_time_ms);
737    }
738    
739    /// Reset all metrics
740    pub fn reset(&mut self) {
741        *self = Self::new();
742    }
743    
744    /// Update execution time statistics
745    fn update_execution_time(&mut self, execution_time_ms: u64) {
746        if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
747            self.min_execution_time_ms = execution_time_ms;
748        }
749        if execution_time_ms > self.max_execution_time_ms {
750            self.max_execution_time_ms = execution_time_ms;
751        }
752        
753        // Update average using numerically stable running average algorithm
754        let completed_jobs = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
755        if completed_jobs > 0 {
756            let new_sample = execution_time_ms as f64;
757            // Use a numerically stable algorithm for running average: new_avg = old_avg + (new_sample - old_avg) / sample_count
758            self.avg_execution_time_ms += (new_sample - self.avg_execution_time_ms) / completed_jobs as f64;
759        }
760    }
761    
762    /// Update success rate
763    fn update_success_rate(&mut self) {
764        let total_completed = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
765        if total_completed > 0 {
766            self.success_rate = self.successful_jobs as f64 / total_completed as f64;
767        }
768    }
769    
770    /// Update retry attempts average
771    fn update_retry_attempts(&mut self, attempts: u32) {
772        if self.failed_jobs > 0 {
773            let new_sample = attempts as f64;
774            // Use a numerically stable algorithm for running average: new_avg = old_avg + (new_sample - old_avg) / sample_count
775            self.avg_retry_attempts += (new_sample - self.avg_retry_attempts) / self.failed_jobs as f64;
776        }
777    }
778}
779
780/// Job metrics collector
781#[derive(Debug)]
782pub struct JobMetricsCollector {
783    metrics: Arc<parking_lot::RwLock<JobMetrics>>,
784    active_executions: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, std::time::Instant>>>,
785}
786
787impl JobMetricsCollector {
788    /// Create a new metrics collector
789    pub fn new() -> Self {
790        Self {
791            metrics: Arc::new(parking_lot::RwLock::new(JobMetrics::new())),
792            active_executions: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
793        }
794    }
795    
796    /// Record a job being scheduled
797    pub fn record_job_scheduled(&self, job_type: &str, priority: Priority) {
798        let mut metrics = self.metrics.write();
799        metrics.record_scheduled(job_type, priority);
800    }
801    
802    /// Record job execution start
803    pub fn record_execution_start(&self, job_id: crate::JobId) {
804        let mut metrics = self.metrics.write();
805        let mut executions = self.active_executions.write();
806        
807        metrics.record_execution_start();
808        executions.insert(job_id, std::time::Instant::now());
809    }
810    
811    /// Record job completion (success)
812    pub fn record_job_success(&self, job_id: crate::JobId) {
813        let execution_time = self.get_and_remove_execution_time(job_id);
814        let mut metrics = self.metrics.write();
815        metrics.record_success(execution_time);
816    }
817    
818    /// Record job failure
819    pub fn record_job_failure(&self, job_id: crate::JobId, retry_attempts: u32) {
820        let execution_time = self.get_and_remove_execution_time(job_id);
821        let mut metrics = self.metrics.write();
822        metrics.record_failure(execution_time, retry_attempts);
823    }
824    
825    /// Record job retry
826    pub fn record_job_retry(&self, _job_id: crate::JobId) {
827        // Don't remove execution time for retries
828        let mut metrics = self.metrics.write();
829        metrics.record_retry();
830    }
831    
832    /// Record job timeout
833    pub fn record_job_timeout(&self, job_id: crate::JobId) {
834        let execution_time = self.get_and_remove_execution_time(job_id);
835        let mut metrics = self.metrics.write();
836        metrics.record_timeout(execution_time);
837    }
838    
839    /// Record job cancellation
840    pub fn record_job_cancellation(&self, job_id: crate::JobId) {
841        let execution_time = self.get_and_remove_execution_time(job_id);
842        let mut metrics = self.metrics.write();
843        metrics.record_cancellation(execution_time);
844    }
845    
846    /// Get current metrics snapshot
847    pub fn get_metrics(&self) -> JobMetrics {
848        self.metrics.read().clone()
849    }
850    
851    /// Reset all metrics
852    pub fn reset_metrics(&self) {
853        let mut metrics = self.metrics.write();
854        let mut executions = self.active_executions.write();
855        
856        metrics.reset();
857        executions.clear();
858    }
859    
860    /// Get and remove execution time for a job
861    fn get_and_remove_execution_time(&self, job_id: crate::JobId) -> u64 {
862        let mut executions = self.active_executions.write();
863        if let Some(start_time) = executions.remove(&job_id) {
864            start_time.elapsed().as_millis() as u64
865        } else {
866            0
867        }
868    }
869    
870    /// Get active executions count
871    pub fn active_executions_count(&self) -> usize {
872        self.active_executions.read().len()
873    }
874}
875
876impl Default for JobMetricsCollector {
877    fn default() -> Self {
878        Self::new()
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use std::time::Duration;
886    use crate::{MemoryBackend, QueueConfig};
887    
888    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
889    struct TestJob {
890        message: String,
891    }
892    
893    #[async_trait::async_trait]
894    impl Job for TestJob {
895        async fn execute(&self) -> crate::JobResult<()> {
896            println!("Executing test job: {}", self.message);
897            Ok(())
898        }
899        
900        fn job_type(&self) -> &'static str {
901            "test_job"
902        }
903    }
904    
905    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
906    struct CancellableTestJob {
907        message: String,
908        sleep_duration: Duration,
909    }
910    
911    #[async_trait::async_trait]
912    impl Job for CancellableTestJob {
913        async fn execute(&self) -> crate::JobResult<()> {
914            tokio::time::sleep(self.sleep_duration).await;
915            Ok(())
916        }
917        
918        fn job_type(&self) -> &'static str {
919            "cancellable_test_job"
920        }
921    }
922    
923    #[async_trait::async_trait]
924    impl CancellableJob for CancellableTestJob {
925        async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()> {
926            tokio::select! {
927                _ = tokio::time::sleep(self.sleep_duration) => {
928                    println!("Job completed: {}", self.message);
929                    Ok(())
930                }
931                _ = token.cancelled() => {
932                    println!("Job cancelled: {}", self.message);
933                    Err("Job was cancelled".into())
934                }
935            }
936        }
937    }
938    
939    #[test]
940    fn test_cron_expression_validation() {
941        // Valid cron expressions
942        assert!(CronExpression::new("0 0 0 * * *").is_ok()); // Daily at midnight
943        assert!(CronExpression::new("0 */5 * * * *").is_ok()); // Every 5 minutes
944        assert!(CronExpression::new("0 0 9-17 * * 1-5").is_ok()); // Weekdays 9-5
945        
946        // Invalid cron expressions
947        assert!(CronExpression::new("invalid").is_err());
948        assert!(CronExpression::new("* * * * *").is_err()); // 5 fields not supported
949    }
950    
951    #[test]
952    fn test_cron_next_run_time() {
953        let cron = CronExpression::new("0 0 0 * * *").unwrap(); // Daily at midnight
954        let now = Utc::now();
955        let next = cron.next_run_time(now);
956        
957        assert!(next.is_some());
958        assert!(next.unwrap() > now);
959    }
960    
961    #[test]
962    fn test_cron_presets() {
963        // Test preset expressions
964        assert!(cron_presets::every_minute().next_run_time(Utc::now()).is_some());
965        assert!(cron_presets::hourly().next_run_time(Utc::now()).is_some());
966        assert!(cron_presets::daily().next_run_time(Utc::now()).is_some());
967        assert!(cron_presets::weekly().next_run_time(Utc::now()).is_some());
968        assert!(cron_presets::monthly().next_run_time(Utc::now()).is_some());
969        assert!(cron_presets::weekdays_at_9am().next_run_time(Utc::now()).is_some());
970    }
971    
972    #[test]
973    fn test_retry_strategy_exponential() {
974        let strategy = RetryStrategy::Exponential {
975            initial_delay: Duration::from_secs(1),
976            multiplier: 2.0,
977            max_delay: Duration::from_secs(60),
978            max_attempts: 3,
979            jitter: false,
980        };
981        
982        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
983        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(2)));
984        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(4)));
985        assert_eq!(strategy.delay_for_attempt(3), None); // Exceeds max attempts
986        assert_eq!(strategy.max_attempts(), 3);
987    }
988    
989    #[test]
990    fn test_retry_strategy_linear() {
991        let strategy = RetryStrategy::Linear {
992            initial_delay: Duration::from_secs(5),
993            increment: Duration::from_secs(10),
994            max_delay: Duration::from_secs(60),
995            max_attempts: 4,
996        };
997        
998        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(5)));
999        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(15)));
1000        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(25)));
1001        assert_eq!(strategy.delay_for_attempt(3), Some(Duration::from_secs(35)));
1002        assert_eq!(strategy.delay_for_attempt(4), None); // Exceeds max attempts
1003        assert_eq!(strategy.max_attempts(), 4);
1004    }
1005    
1006    #[test]
1007    fn test_retry_strategy_fixed() {
1008        let strategy = RetryStrategy::Fixed {
1009            delay: Duration::from_secs(10),
1010            max_attempts: 2,
1011        };
1012        
1013        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(10)));
1014        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(10)));
1015        assert_eq!(strategy.delay_for_attempt(2), None); // Exceeds max attempts
1016        assert_eq!(strategy.max_attempts(), 2);
1017    }
1018    
1019    #[test]
1020    fn test_retry_strategy_custom() {
1021        let strategy = RetryStrategy::Custom {
1022            delays: vec![
1023                Duration::from_secs(1),
1024                Duration::from_secs(5),
1025                Duration::from_secs(30),
1026            ],
1027        };
1028        
1029        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
1030        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(5)));
1031        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(30)));
1032        assert_eq!(strategy.delay_for_attempt(3), None); // No more delays
1033        assert_eq!(strategy.max_attempts(), 3);
1034    }
1035    
1036    #[test]
1037    fn test_scheduled_job_creation() {
1038        let job = TestJob {
1039            message: "Hello, World!".to_string(),
1040        };
1041        
1042        let scheduled = ScheduledJob::new(
1043            "test_schedule".to_string(),
1044            "0 0 0 * * *", // Daily at midnight
1045            job,
1046            Some(Priority::High),
1047            None,
1048        ).unwrap();
1049        
1050        assert_eq!(scheduled.id, "test_schedule");
1051        assert_eq!(scheduled.job_type, "test_job");
1052        assert_eq!(scheduled.priority, Priority::High);
1053        assert!(scheduled.enabled);
1054        assert!(scheduled.next_run.is_some());
1055    }
1056    
1057    #[tokio::test]
1058    async fn test_job_scheduler_basic() {
1059        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1060        let scheduler = JobScheduler::new(backend);
1061        
1062        let job = TestJob {
1063            message: "Scheduled job".to_string(),
1064        };
1065        
1066        let scheduled = ScheduledJob::new(
1067            "test_schedule".to_string(),
1068            "0 * * * * *", // Every minute (for testing)
1069            job,
1070            Some(Priority::Normal),
1071            None,
1072        ).unwrap();
1073        
1074        scheduler.add_schedule(scheduled).unwrap();
1075        
1076        let schedules = scheduler.list_schedules();
1077        assert_eq!(schedules.len(), 1);
1078        assert_eq!(schedules[0].id, "test_schedule");
1079        
1080        // Test retrieval
1081        let retrieved = scheduler.get_schedule("test_schedule");
1082        assert!(retrieved.is_some());
1083        assert_eq!(retrieved.unwrap().id, "test_schedule");
1084        
1085        // Test removal
1086        assert!(scheduler.remove_schedule("test_schedule").unwrap());
1087        assert!(scheduler.get_schedule("test_schedule").is_none());
1088    }
1089    
1090    #[tokio::test]
1091    async fn test_cancellation_token() {
1092        let token = CancellationToken::new();
1093        
1094        // Initially not cancelled
1095        assert!(!token.is_cancelled());
1096        
1097        // Test wait_for_cancellation completes when already cancelled
1098        token.cancel();
1099        assert!(token.is_cancelled());
1100        
1101        // Should complete immediately since already cancelled
1102        token.wait_for_cancellation().await;
1103        
1104        // Test cloning preserves state
1105        let cloned = token.clone();
1106        assert!(cloned.is_cancelled());
1107        cloned.wait_for_cancellation().await; // Should also complete immediately
1108    }
1109    
1110    #[tokio::test]
1111    async fn test_cancellation_token_async_notification() {
1112        let token = CancellationToken::new();
1113        let token_clone = token.clone();
1114        
1115        // Spawn a task that waits for cancellation
1116        let wait_task = tokio::spawn(async move {
1117            token_clone.wait_for_cancellation().await;
1118            "cancelled"
1119        });
1120        
1121        // Give the wait task a moment to start waiting
1122        tokio::time::sleep(Duration::from_millis(1)).await;
1123        
1124        // The task should still be running (not completed yet)
1125        assert!(!wait_task.is_finished());
1126        
1127        // Cancel the token - this should wake up the waiting task
1128        token.cancel();
1129        
1130        // The task should complete quickly now
1131        let result = tokio::time::timeout(Duration::from_millis(100), wait_task).await;
1132        assert!(result.is_ok());
1133        assert_eq!(result.unwrap().unwrap(), "cancelled");
1134    }
1135    
1136    #[test]
1137    fn test_job_cancellation_manager() {
1138        let manager = JobCancellationManager::new();
1139        let job_id = crate::JobId::new_v4();
1140        
1141        // Register a job
1142        let token = manager.register_job(job_id);
1143        assert_eq!(manager.active_job_count(), 1);
1144        assert!(manager.active_jobs().contains(&job_id));
1145        
1146        // Token should not be cancelled initially
1147        assert!(!token.is_cancelled());
1148        
1149        // Cancel the specific job
1150        assert!(manager.cancel_job(job_id));
1151        assert!(token.is_cancelled());
1152        
1153        // Unregister the job
1154        manager.unregister_job(job_id);
1155        assert_eq!(manager.active_job_count(), 0);
1156        
1157        // Cancelling non-existent job should return false
1158        assert!(!manager.cancel_job(job_id));
1159    }
1160    
1161    #[test]
1162    fn test_job_metrics() {
1163        let mut metrics = JobMetrics::new();
1164        
1165        // Record scheduled jobs
1166        metrics.record_scheduled("test_job", Priority::High);
1167        metrics.record_scheduled("test_job", Priority::Normal);
1168        metrics.record_scheduled("email_job", Priority::High);
1169        
1170        assert_eq!(metrics.total_scheduled, 3);
1171        assert_eq!(*metrics.jobs_by_type.get("test_job").unwrap(), 2);
1172        assert_eq!(*metrics.jobs_by_type.get("email_job").unwrap(), 1);
1173        assert_eq!(*metrics.jobs_by_priority.get("High").unwrap(), 2);
1174        assert_eq!(*metrics.jobs_by_priority.get("Normal").unwrap(), 1);
1175        
1176        // Record executions
1177        metrics.record_execution_start();
1178        metrics.record_execution_start();
1179        assert_eq!(metrics.total_executed, 2);
1180        
1181        // Record success
1182        metrics.record_success(100); // 100ms execution time
1183        assert_eq!(metrics.successful_jobs, 1);
1184        assert_eq!(metrics.min_execution_time_ms, 100);
1185        assert_eq!(metrics.max_execution_time_ms, 100);
1186        assert_eq!(metrics.avg_execution_time_ms, 100.0); // Only one completed job
1187        
1188        // Record failure with retry attempts
1189        metrics.record_failure(200, 2); // 200ms, 2 retry attempts
1190        assert_eq!(metrics.failed_jobs, 1);
1191        assert_eq!(metrics.avg_retry_attempts, 2.0);
1192        assert_eq!(metrics.max_execution_time_ms, 200);
1193        assert_eq!(metrics.avg_execution_time_ms, 150.0); // (100 + 200) / 2
1194        
1195        // Check success rate
1196        assert_eq!(metrics.success_rate, 0.5); // 1 success out of 2 total
1197    }
1198    
1199    #[test]
1200    fn test_job_metrics_collector() {
1201        let collector = JobMetricsCollector::new();
1202        let job_id = crate::JobId::new_v4();
1203        
1204        // Record scheduled job
1205        collector.record_job_scheduled("test_job", Priority::High);
1206        
1207        // Record execution
1208        collector.record_execution_start(job_id);
1209        assert_eq!(collector.active_executions_count(), 1);
1210        
1211        // Record success
1212        std::thread::sleep(Duration::from_millis(10)); // Small delay for timing
1213        collector.record_job_success(job_id);
1214        assert_eq!(collector.active_executions_count(), 0);
1215        
1216        let metrics = collector.get_metrics();
1217        assert_eq!(metrics.total_scheduled, 1);
1218        assert_eq!(metrics.total_executed, 1);
1219        assert_eq!(metrics.successful_jobs, 1);
1220        assert_eq!(metrics.success_rate, 1.0);
1221        assert!(metrics.min_execution_time_ms >= 10);
1222    }
1223    
1224    #[tokio::test]
1225    async fn test_atomic_clear_dead_jobs() {
1226        use crate::{MemoryBackend, QueueConfig, JobEntry, Priority, QueueBackend};
1227        
1228        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1229        let scheduler = JobScheduler::new(backend.clone());
1230        
1231        // Create some test jobs and mark them as dead
1232        let job1 = TestJob { message: "job1".to_string() };
1233        let job2 = TestJob { message: "job2".to_string() };
1234        let job3 = TestJob { message: "job3".to_string() };
1235        
1236        let mut entry1 = JobEntry::new(job1, Some(Priority::Normal), None).unwrap();
1237        let mut entry2 = JobEntry::new(job2, Some(Priority::High), None).unwrap();
1238        let mut entry3 = JobEntry::new(job3, Some(Priority::Low), None).unwrap();
1239        
1240        // Mark jobs as failed beyond retry limit (dead) - using a fixed number since max_retries is not accessible
1241        for _ in 0..=3 {
1242            entry1.mark_failed("Test failure".to_string());
1243            entry2.mark_failed("Test failure".to_string());
1244            entry3.mark_failed("Test failure".to_string());
1245        }
1246        
1247        // Enqueue the dead jobs
1248        backend.enqueue(entry1).await.unwrap();
1249        backend.enqueue(entry2).await.unwrap();
1250        backend.enqueue(entry3).await.unwrap();
1251        
1252        // Verify dead jobs exist
1253        let dead_jobs_before = scheduler.get_dead_jobs(None).await.unwrap();
1254        assert_eq!(dead_jobs_before.len(), 3);
1255        
1256        // Clear dead jobs atomically
1257        let cleared_count = scheduler.clear_dead_jobs().await.unwrap();
1258        assert_eq!(cleared_count, 3);
1259        
1260        // Verify no dead jobs remain
1261        let dead_jobs_after = scheduler.get_dead_jobs(None).await.unwrap();
1262        assert_eq!(dead_jobs_after.len(), 0);
1263        
1264        // Verify stats are updated correctly
1265        let stats = backend.stats().await.unwrap();
1266        assert_eq!(stats.dead_jobs, 0);
1267    }
1268    
1269    #[tokio::test]
1270    async fn test_atomic_requeue_dead_job() {
1271        use crate::{MemoryBackend, QueueConfig, JobEntry, JobState, Priority, QueueBackend};
1272        
1273        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1274        let scheduler = JobScheduler::new(backend.clone());
1275        
1276        // Create and enqueue a dead job
1277        let job = TestJob { message: "dead job".to_string() };
1278        let mut entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
1279        let job_id = entry.id();
1280        
1281        // Mark as dead - using a fixed number since max_retries is not accessible
1282        for _ in 0..=3 {
1283            entry.mark_failed("Test failure".to_string());
1284        }
1285        backend.enqueue(entry).await.unwrap();
1286        
1287        // Verify job is dead
1288        let stats_before = backend.stats().await.unwrap();
1289        assert_eq!(stats_before.dead_jobs, 1);
1290        assert_eq!(stats_before.pending_jobs, 0);
1291        
1292        // Requeue the dead job atomically
1293        let requeued = scheduler.requeue_dead_job(job_id).await.unwrap();
1294        assert!(requeued);
1295        
1296        // Verify job is now pending
1297        let stats_after = backend.stats().await.unwrap();
1298        assert_eq!(stats_after.dead_jobs, 0);
1299        assert_eq!(stats_after.pending_jobs, 1);
1300        
1301        // Verify job state was reset
1302        let job_entry = backend.get_job(job_id).await.unwrap().unwrap();
1303        assert_eq!(job_entry.state(), &JobState::Pending);
1304        assert_eq!(job_entry.attempts(), 0);
1305    }
1306}