1use std::time::Duration;
10use std::str::FromStr;
11use std::sync::Arc;
12use chrono::{DateTime, Utc};
13use cron::Schedule;
14use serde::{Serialize, Deserialize};
15use thiserror::Error;
16use crate::{Job, JobEntry, Priority, QueueResult, QueueError};
17
18#[derive(Error, Debug)]
20pub enum ScheduleError {
21    #[error("Invalid cron expression: {0}")]
22    InvalidCron(String),
23    
24    #[error("Schedule not found: {0}")]
25    ScheduleNotFound(String),
26    
27    #[error("Invalid retry configuration: {0}")]
28    InvalidRetryConfig(String),
29    
30    #[error("Queue error: {0}")]
31    Queue(#[from] QueueError),
32}
33
34pub type ScheduleResult<T> = Result<T, ScheduleError>;
36
37#[derive(Debug, Clone, Serialize)]
39pub struct CronExpression {
40    expression: String,
41    #[serde(skip)]
42    schedule: Option<Schedule>,
43}
44
45impl CronExpression {
46    pub fn new(expression: &str) -> ScheduleResult<Self> {
54        let schedule = Schedule::from_str(expression)
55            .map_err(|e| ScheduleError::InvalidCron(format!("{}: {}", expression, e)))?;
56            
57        Ok(CronExpression {
58            expression: expression.to_string(),
59            schedule: Some(schedule),
60        })
61    }
62    
63    pub fn expression(&self) -> &str {
65        &self.expression
66    }
67    
68    pub fn next_run_time(&self, after: DateTime<Utc>) -> Option<DateTime<Utc>> {
70        self.schedule.as_ref()?.after(&after).next()
71    }
72    
73    pub fn next_run_times(&self, after: DateTime<Utc>, count: usize) -> Vec<DateTime<Utc>> {
75        self.schedule.as_ref()
76            .map(|s| s.after(&after).take(count).collect())
77            .unwrap_or_default()
78    }
79    
80    pub fn should_run(&self, at: DateTime<Utc>) -> bool {
82        if let Some(next) = self.next_run_time(at - chrono::Duration::seconds(1)) {
83            (next - at).num_seconds().abs() < 30 } else {
85            false
86        }
87    }
88}
89
90impl<'de> Deserialize<'de> for CronExpression {
92    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
93    where
94        D: serde::Deserializer<'de>,
95    {
96        #[derive(Deserialize)]
97        struct CronExpressionData {
98            expression: String,
99        }
100        
101        let data = CronExpressionData::deserialize(deserializer)?;
102        CronExpression::new(&data.expression)
103            .map_err(|e| serde::de::Error::custom(e.to_string()))
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub enum RetryStrategy {
110    Fixed {
112        delay: Duration,
113        max_attempts: u32,
114    },
115    Exponential {
117        initial_delay: Duration,
118        multiplier: f64,
119        max_delay: Duration,
120        max_attempts: u32,
121        jitter: bool,
122    },
123    Linear {
125        initial_delay: Duration,
126        increment: Duration,
127        max_delay: Duration,
128        max_attempts: u32,
129    },
130    Custom {
132        delays: Vec<Duration>,
133    },
134}
135
136impl Default for RetryStrategy {
137    fn default() -> Self {
138        RetryStrategy::Exponential {
139            initial_delay: Duration::from_secs(1),
140            multiplier: 2.0,
141            max_delay: Duration::from_secs(300), max_attempts: 3,
143            jitter: true,
144        }
145    }
146}
147
148impl RetryStrategy {
149    pub fn delay_for_attempt(&self, attempt: u32) -> Option<Duration> {
151        use rand::Rng;
152        
153        match self {
154            RetryStrategy::Fixed { delay, max_attempts } => {
155                if attempt < *max_attempts {
156                    Some(*delay)
157                } else {
158                    None
159                }
160            }
161            RetryStrategy::Exponential {
162                initial_delay,
163                multiplier,
164                max_delay,
165                max_attempts,
166                jitter,
167            } => {
168                if attempt >= *max_attempts {
169                    return None;
170                }
171                
172                let delay = initial_delay.as_secs_f64() * multiplier.powi(attempt as i32);
173                let delay = delay.min(max_delay.as_secs_f64());
174                
175                let delay = if *jitter {
176                    let mut rng = rand::thread_rng();
178                    let jitter_factor = rng.gen_range(0.75..1.25);
179                    delay * jitter_factor
180                } else {
181                    delay
182                };
183                
184                Some(Duration::from_secs_f64(delay))
185            }
186            RetryStrategy::Linear {
187                initial_delay,
188                increment,
189                max_delay,
190                max_attempts,
191            } => {
192                if attempt >= *max_attempts {
193                    return None;
194                }
195                
196                let delay = initial_delay.as_secs() + (increment.as_secs() * attempt as u64);
197                let delay = delay.min(max_delay.as_secs());
198                Some(Duration::from_secs(delay))
199            }
200            RetryStrategy::Custom { delays } => {
201                delays.get(attempt as usize).copied()
202            }
203        }
204    }
205    
206    pub fn max_attempts(&self) -> u32 {
208        match self {
209            RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
210            RetryStrategy::Exponential { max_attempts, .. } => *max_attempts,
211            RetryStrategy::Linear { max_attempts, .. } => *max_attempts,
212            RetryStrategy::Custom { delays } => delays.len() as u32,
213        }
214    }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ScheduledJob {
220    pub id: String,
222    pub cron: CronExpression,
224    pub job_type: String,
226    pub payload: serde_json::Value,
228    pub priority: Priority,
230    pub retry_strategy: RetryStrategy,
232    pub timeout: Duration,
234    pub enabled: bool,
236    pub description: Option<String>,
238    pub next_run: Option<DateTime<Utc>>,
240    pub last_run: Option<DateTime<Utc>>,
242    pub created_at: DateTime<Utc>,
244}
245
246impl ScheduledJob {
247    pub fn new<T: Job>(
249        id: String,
250        cron_expr: &str,
251        job: T,
252        priority: Option<Priority>,
253        retry_strategy: Option<RetryStrategy>,
254    ) -> ScheduleResult<Self> {
255        let cron = CronExpression::new(cron_expr)?;
256        let now = Utc::now();
257        let next_run = cron.next_run_time(now);
258        
259        let job_type = job.job_type().to_string();
260        let timeout = job.timeout();
261        let payload = serde_json::to_value(job)
262            .map_err(|e| ScheduleError::Queue(QueueError::Serialization(e)))?;
263        
264        Ok(ScheduledJob {
265            id,
266            cron,
267            job_type,
268            payload,
269            priority: priority.unwrap_or_default(),
270            retry_strategy: retry_strategy.unwrap_or_default(),
271            timeout,
272            enabled: true,
273            description: None,
274            next_run,
275            last_run: None,
276            created_at: now,
277        })
278    }
279    
280    pub fn update_next_run(&mut self) {
282        let after = self.last_run.unwrap_or_else(Utc::now);
283        self.next_run = self.cron.next_run_time(after);
284    }
285    
286    pub fn should_run(&self) -> bool {
288        if !self.enabled {
289            return false;
290        }
291        
292        if let Some(next_run) = self.next_run {
293            next_run <= Utc::now()
294        } else {
295            false
296        }
297    }
298    
299    pub fn mark_executed(&mut self) {
301        self.last_run = Some(Utc::now());
302        self.update_next_run();
303    }
304    
305    pub fn create_job_entry(&self) -> QueueResult<JobEntry> {
307        JobEntry::new_with_job_type(
308            self.job_type.clone(),
309            self.payload.clone(),
310            Some(self.priority),
311            None, self.retry_strategy.max_attempts(),
313        )
314    }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319struct ScheduledJobWrapper {
320    job_type: String,
321    payload: serde_json::Value,
322    max_retries: u32,
323    timeout: Duration,
324}
325
326#[async_trait::async_trait]
327impl Job for ScheduledJobWrapper {
328    async fn execute(&self) -> crate::JobResult<()> {
329        Ok(())
331    }
332    
333    fn job_type(&self) -> &'static str {
334        "scheduled_job_wrapper"
337    }
338    
339    fn max_retries(&self) -> u32 {
340        self.max_retries
341    }
342    
343    fn timeout(&self) -> Duration {
344        self.timeout
345    }
346}
347
348pub struct JobScheduler<B: crate::QueueBackend> {
350    backend: std::sync::Arc<B>,
351    schedules: std::sync::Arc<parking_lot::RwLock<std::collections::HashMap<String, ScheduledJob>>>,
352    running: std::sync::Arc<std::sync::atomic::AtomicBool>,
353}
354
355impl<B: crate::QueueBackend + 'static> JobScheduler<B> {
356    pub fn new(backend: std::sync::Arc<B>) -> Self {
358        Self {
359            backend,
360            schedules: std::sync::Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
361            running: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
362        }
363    }
364    
365    pub fn add_schedule(&self, schedule: ScheduledJob) -> ScheduleResult<()> {
367        let mut schedules = self.schedules.write();
368        schedules.insert(schedule.id.clone(), schedule);
369        Ok(())
370    }
371    
372    pub fn remove_schedule(&self, id: &str) -> ScheduleResult<bool> {
374        let mut schedules = self.schedules.write();
375        Ok(schedules.remove(id).is_some())
376    }
377    
378    pub fn get_schedule(&self, id: &str) -> Option<ScheduledJob> {
380        let schedules = self.schedules.read();
381        schedules.get(id).cloned()
382    }
383    
384    pub fn list_schedules(&self) -> Vec<ScheduledJob> {
386        let schedules = self.schedules.read();
387        schedules.values().cloned().collect()
388    }
389    
390    pub fn set_schedule_enabled(&self, id: &str, enabled: bool) -> ScheduleResult<bool> {
392        let mut schedules = self.schedules.write();
393        if let Some(schedule) = schedules.get_mut(id) {
394            schedule.enabled = enabled;
395            Ok(true)
396        } else {
397            Ok(false)
398        }
399    }
400    
401    pub async fn start(&self) -> ScheduleResult<()> {
403        self.running.store(true, std::sync::atomic::Ordering::SeqCst);
404        
405        let backend = self.backend.clone();
406        let schedules = self.schedules.clone();
407        let running = self.running.clone();
408        
409        tokio::spawn(async move {
410            let mut interval = tokio::time::interval(Duration::from_secs(30)); while running.load(std::sync::atomic::Ordering::SeqCst) {
413                interval.tick().await;
414                
415                let mut due_schedules = Vec::new();
417                {
418                    let mut schedules_guard = schedules.write();
419                    for schedule in schedules_guard.values_mut() {
420                        if schedule.should_run() {
421                            schedule.mark_executed();
422                            due_schedules.push(schedule.clone());
423                        }
424                    }
425                }
426                
427                for schedule in due_schedules {
429                    if let Ok(job_entry) = schedule.create_job_entry() {
430                        if let Err(e) = backend.enqueue(job_entry).await {
431                            tracing::error!("Failed to enqueue scheduled job {}: {}", schedule.id, e);
432                        } else {
433                            tracing::info!("Enqueued scheduled job: {}", schedule.id);
434                        }
435                    }
436                }
437            }
438        });
439        
440        Ok(())
441    }
442    
443    pub fn stop(&self) {
445        self.running.store(false, std::sync::atomic::Ordering::SeqCst);
446    }
447    
448    pub fn is_running(&self) -> bool {
450        self.running.load(std::sync::atomic::Ordering::SeqCst)
451    }
452    
453    pub async fn get_dead_jobs(&self, limit: Option<usize>) -> QueueResult<Vec<crate::JobEntry>> {
455        self.backend.get_jobs_by_state(crate::JobState::Dead, limit).await
456    }
457    
458    pub async fn requeue_dead_job(&self, job_id: crate::JobId) -> QueueResult<bool> {
460        if let Some(job) = self.backend.get_job(job_id).await? {
461            if job.state() == &crate::JobState::Dead {
462                self.backend.requeue_job(job_id, job).await
464            } else {
465                Ok(false)
466            }
467        } else {
468            Ok(false)
469        }
470    }
471    
472    pub async fn clear_dead_jobs(&self) -> QueueResult<u64> {
474        self.backend.clear_jobs_by_state(crate::JobState::Dead).await
476    }
477}
478
479
480pub mod cron_presets {
482    use super::CronExpression;
483    
484    pub fn every_minute() -> CronExpression {
486        CronExpression::new("0 * * * * *").expect("Invalid 'every_minute' cron preset")
487    }
488    
489    pub fn every_5_minutes() -> CronExpression {
491        CronExpression::new("0 */5 * * * *").unwrap()
492    }
493    
494    pub fn every_15_minutes() -> CronExpression {
496        CronExpression::new("0 */15 * * * *").unwrap()
497    }
498    
499    pub fn every_30_minutes() -> CronExpression {
501        CronExpression::new("0 */30 * * * *").unwrap()
502    }
503    
504    pub fn hourly() -> CronExpression {
506        CronExpression::new("0 0 * * * *").unwrap()
507    }
508    
509    pub fn daily() -> CronExpression {
511        CronExpression::new("0 0 0 * * *").unwrap()
512    }
513    
514    pub fn weekly() -> CronExpression {
516        CronExpression::new("0 0 0 * * SUN").unwrap()
517    }
518    
519    pub fn monthly() -> CronExpression {
521        CronExpression::new("0 0 0 1 * *").unwrap()
522    }
523    
524    pub fn weekdays_at_9am() -> CronExpression {
526        CronExpression::new("0 0 9 * * 1-5").unwrap()
527    }
528    
529    pub fn custom(expression: &str) -> Result<CronExpression, super::ScheduleError> {
531        CronExpression::new(expression)
532    }
533}
534
535#[derive(Debug, Clone)]
537pub struct CancellationToken {
538    cancelled: Arc<std::sync::atomic::AtomicBool>,
539    notify: Arc<tokio::sync::Notify>,
540}
541
542impl CancellationToken {
543    pub fn new() -> Self {
545        Self {
546            cancelled: Arc::new(std::sync::atomic::AtomicBool::new(false)),
547            notify: Arc::new(tokio::sync::Notify::new()),
548        }
549    }
550    
551    pub fn cancel(&self) {
553        self.cancelled.store(true, std::sync::atomic::Ordering::SeqCst);
554        self.notify.notify_waiters();
555    }
556    
557    pub fn is_cancelled(&self) -> bool {
559        self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
560    }
561    
562    pub async fn wait_for_cancellation(&self) {
564        if self.is_cancelled() {
565            return;
566        }
567        
568        self.notify.notified().await;
569    }
570    
571    pub async fn cancelled(&self) {
573        self.wait_for_cancellation().await;
574    }
575}
576
577impl Default for CancellationToken {
578    fn default() -> Self {
579        Self::new()
580    }
581}
582
583#[derive(Debug)]
585pub struct JobCancellationManager {
586    active_tokens: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, CancellationToken>>>,
587}
588
589impl JobCancellationManager {
590    pub fn new() -> Self {
592        Self {
593            active_tokens: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
594        }
595    }
596    
597    pub fn register_job(&self, job_id: crate::JobId) -> CancellationToken {
599        let token = CancellationToken::new();
600        self.active_tokens.write().insert(job_id, token.clone());
601        token
602    }
603    
604    pub fn cancel_job(&self, job_id: crate::JobId) -> bool {
606        if let Some(token) = self.active_tokens.read().get(&job_id) {
607            token.cancel();
608            true
609        } else {
610            false
611        }
612    }
613    
614    pub fn cancel_all(&self) {
616        let tokens = self.active_tokens.read();
617        for token in tokens.values() {
618            token.cancel();
619        }
620    }
621    
622    pub fn unregister_job(&self, job_id: crate::JobId) {
624        self.active_tokens.write().remove(&job_id);
625    }
626    
627    pub fn active_job_count(&self) -> usize {
629        self.active_tokens.read().len()
630    }
631    
632    pub fn active_jobs(&self) -> Vec<crate::JobId> {
634        self.active_tokens.read().keys().cloned().collect()
635    }
636}
637
638impl Default for JobCancellationManager {
639    fn default() -> Self {
640        Self::new()
641    }
642}
643
644#[async_trait::async_trait]
646pub trait CancellableJob: Job {
647    async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()>;
649}
650
651#[derive(Debug, Clone, Default, Serialize, Deserialize)]
653pub struct JobMetrics {
654    pub total_scheduled: u64,
656    pub total_executed: u64,
658    pub successful_jobs: u64,
660    pub failed_jobs: u64,
662    pub retried_jobs: u64,
664    pub timeout_jobs: u64,
666    pub cancelled_jobs: u64,
668    pub avg_execution_time_ms: f64,
670    pub min_execution_time_ms: u64,
672    pub max_execution_time_ms: u64,
674    pub jobs_by_priority: std::collections::HashMap<String, u64>,
676    pub jobs_by_type: std::collections::HashMap<String, u64>,
678    pub success_rate: f64,
680    pub avg_retry_attempts: f64,
682    pub last_reset: DateTime<Utc>,
684}
685
686impl JobMetrics {
687    pub fn new() -> Self {
689        Self {
690            last_reset: Utc::now(),
691            ..Default::default()
692        }
693    }
694    
695    pub fn record_scheduled(&mut self, job_type: &str, priority: Priority) {
697        self.total_scheduled += 1;
698        *self.jobs_by_type.entry(job_type.to_string()).or_insert(0) += 1;
699        *self.jobs_by_priority.entry(format!("{:?}", priority)).or_insert(0) += 1;
700    }
701    
702    pub fn record_execution_start(&mut self) {
704        self.total_executed += 1;
705    }
706    
707    pub fn record_success(&mut self, execution_time_ms: u64) {
709        self.successful_jobs += 1;
710        self.update_execution_time(execution_time_ms);
711        self.update_success_rate();
712    }
713    
714    pub fn record_failure(&mut self, execution_time_ms: u64, retry_attempts: u32) {
716        self.failed_jobs += 1;
717        self.update_execution_time(execution_time_ms);
718        self.update_success_rate();
719        self.update_retry_attempts(retry_attempts);
720    }
721    
722    pub fn record_retry(&mut self) {
724        self.retried_jobs += 1;
725    }
726    
727    pub fn record_timeout(&mut self, execution_time_ms: u64) {
729        self.timeout_jobs += 1;
730        self.update_execution_time(execution_time_ms);
731    }
732    
733    pub fn record_cancellation(&mut self, execution_time_ms: u64) {
735        self.cancelled_jobs += 1;
736        self.update_execution_time(execution_time_ms);
737    }
738    
739    pub fn reset(&mut self) {
741        *self = Self::new();
742    }
743    
744    fn update_execution_time(&mut self, execution_time_ms: u64) {
746        if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
747            self.min_execution_time_ms = execution_time_ms;
748        }
749        if execution_time_ms > self.max_execution_time_ms {
750            self.max_execution_time_ms = execution_time_ms;
751        }
752        
753        let completed_jobs = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
755        if completed_jobs > 0 {
756            let new_sample = execution_time_ms as f64;
757            self.avg_execution_time_ms += (new_sample - self.avg_execution_time_ms) / completed_jobs as f64;
759        }
760    }
761    
762    fn update_success_rate(&mut self) {
764        let total_completed = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
765        if total_completed > 0 {
766            self.success_rate = self.successful_jobs as f64 / total_completed as f64;
767        }
768    }
769    
770    fn update_retry_attempts(&mut self, attempts: u32) {
772        if self.failed_jobs > 0 {
773            let new_sample = attempts as f64;
774            self.avg_retry_attempts += (new_sample - self.avg_retry_attempts) / self.failed_jobs as f64;
776        }
777    }
778}
779
780#[derive(Debug)]
782pub struct JobMetricsCollector {
783    metrics: Arc<parking_lot::RwLock<JobMetrics>>,
784    active_executions: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, std::time::Instant>>>,
785}
786
787impl JobMetricsCollector {
788    pub fn new() -> Self {
790        Self {
791            metrics: Arc::new(parking_lot::RwLock::new(JobMetrics::new())),
792            active_executions: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
793        }
794    }
795    
796    pub fn record_job_scheduled(&self, job_type: &str, priority: Priority) {
798        let mut metrics = self.metrics.write();
799        metrics.record_scheduled(job_type, priority);
800    }
801    
802    pub fn record_execution_start(&self, job_id: crate::JobId) {
804        let mut metrics = self.metrics.write();
805        let mut executions = self.active_executions.write();
806        
807        metrics.record_execution_start();
808        executions.insert(job_id, std::time::Instant::now());
809    }
810    
811    pub fn record_job_success(&self, job_id: crate::JobId) {
813        let execution_time = self.get_and_remove_execution_time(job_id);
814        let mut metrics = self.metrics.write();
815        metrics.record_success(execution_time);
816    }
817    
818    pub fn record_job_failure(&self, job_id: crate::JobId, retry_attempts: u32) {
820        let execution_time = self.get_and_remove_execution_time(job_id);
821        let mut metrics = self.metrics.write();
822        metrics.record_failure(execution_time, retry_attempts);
823    }
824    
825    pub fn record_job_retry(&self, _job_id: crate::JobId) {
827        let mut metrics = self.metrics.write();
829        metrics.record_retry();
830    }
831    
832    pub fn record_job_timeout(&self, job_id: crate::JobId) {
834        let execution_time = self.get_and_remove_execution_time(job_id);
835        let mut metrics = self.metrics.write();
836        metrics.record_timeout(execution_time);
837    }
838    
839    pub fn record_job_cancellation(&self, job_id: crate::JobId) {
841        let execution_time = self.get_and_remove_execution_time(job_id);
842        let mut metrics = self.metrics.write();
843        metrics.record_cancellation(execution_time);
844    }
845    
846    pub fn get_metrics(&self) -> JobMetrics {
848        self.metrics.read().clone()
849    }
850    
851    pub fn reset_metrics(&self) {
853        let mut metrics = self.metrics.write();
854        let mut executions = self.active_executions.write();
855        
856        metrics.reset();
857        executions.clear();
858    }
859    
860    fn get_and_remove_execution_time(&self, job_id: crate::JobId) -> u64 {
862        let mut executions = self.active_executions.write();
863        if let Some(start_time) = executions.remove(&job_id) {
864            start_time.elapsed().as_millis() as u64
865        } else {
866            0
867        }
868    }
869    
870    pub fn active_executions_count(&self) -> usize {
872        self.active_executions.read().len()
873    }
874}
875
876impl Default for JobMetricsCollector {
877    fn default() -> Self {
878        Self::new()
879    }
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use std::time::Duration;
886    use crate::{MemoryBackend, QueueConfig};
887    
888    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
889    struct TestJob {
890        message: String,
891    }
892    
893    #[async_trait::async_trait]
894    impl Job for TestJob {
895        async fn execute(&self) -> crate::JobResult<()> {
896            println!("Executing test job: {}", self.message);
897            Ok(())
898        }
899        
900        fn job_type(&self) -> &'static str {
901            "test_job"
902        }
903    }
904    
905    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
906    struct CancellableTestJob {
907        message: String,
908        sleep_duration: Duration,
909    }
910    
911    #[async_trait::async_trait]
912    impl Job for CancellableTestJob {
913        async fn execute(&self) -> crate::JobResult<()> {
914            tokio::time::sleep(self.sleep_duration).await;
915            Ok(())
916        }
917        
918        fn job_type(&self) -> &'static str {
919            "cancellable_test_job"
920        }
921    }
922    
923    #[async_trait::async_trait]
924    impl CancellableJob for CancellableTestJob {
925        async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()> {
926            tokio::select! {
927                _ = tokio::time::sleep(self.sleep_duration) => {
928                    println!("Job completed: {}", self.message);
929                    Ok(())
930                }
931                _ = token.cancelled() => {
932                    println!("Job cancelled: {}", self.message);
933                    Err("Job was cancelled".into())
934                }
935            }
936        }
937    }
938    
939    #[test]
940    fn test_cron_expression_validation() {
941        assert!(CronExpression::new("0 0 0 * * *").is_ok()); assert!(CronExpression::new("0 */5 * * * *").is_ok()); assert!(CronExpression::new("0 0 9-17 * * 1-5").is_ok()); assert!(CronExpression::new("invalid").is_err());
948        assert!(CronExpression::new("* * * * *").is_err()); }
950    
951    #[test]
952    fn test_cron_next_run_time() {
953        let cron = CronExpression::new("0 0 0 * * *").unwrap(); let now = Utc::now();
955        let next = cron.next_run_time(now);
956        
957        assert!(next.is_some());
958        assert!(next.unwrap() > now);
959    }
960    
961    #[test]
962    fn test_cron_presets() {
963        assert!(cron_presets::every_minute().next_run_time(Utc::now()).is_some());
965        assert!(cron_presets::hourly().next_run_time(Utc::now()).is_some());
966        assert!(cron_presets::daily().next_run_time(Utc::now()).is_some());
967        assert!(cron_presets::weekly().next_run_time(Utc::now()).is_some());
968        assert!(cron_presets::monthly().next_run_time(Utc::now()).is_some());
969        assert!(cron_presets::weekdays_at_9am().next_run_time(Utc::now()).is_some());
970    }
971    
972    #[test]
973    fn test_retry_strategy_exponential() {
974        let strategy = RetryStrategy::Exponential {
975            initial_delay: Duration::from_secs(1),
976            multiplier: 2.0,
977            max_delay: Duration::from_secs(60),
978            max_attempts: 3,
979            jitter: false,
980        };
981        
982        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
983        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(2)));
984        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(4)));
985        assert_eq!(strategy.delay_for_attempt(3), None); assert_eq!(strategy.max_attempts(), 3);
987    }
988    
989    #[test]
990    fn test_retry_strategy_linear() {
991        let strategy = RetryStrategy::Linear {
992            initial_delay: Duration::from_secs(5),
993            increment: Duration::from_secs(10),
994            max_delay: Duration::from_secs(60),
995            max_attempts: 4,
996        };
997        
998        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(5)));
999        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(15)));
1000        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(25)));
1001        assert_eq!(strategy.delay_for_attempt(3), Some(Duration::from_secs(35)));
1002        assert_eq!(strategy.delay_for_attempt(4), None); assert_eq!(strategy.max_attempts(), 4);
1004    }
1005    
1006    #[test]
1007    fn test_retry_strategy_fixed() {
1008        let strategy = RetryStrategy::Fixed {
1009            delay: Duration::from_secs(10),
1010            max_attempts: 2,
1011        };
1012        
1013        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(10)));
1014        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(10)));
1015        assert_eq!(strategy.delay_for_attempt(2), None); assert_eq!(strategy.max_attempts(), 2);
1017    }
1018    
1019    #[test]
1020    fn test_retry_strategy_custom() {
1021        let strategy = RetryStrategy::Custom {
1022            delays: vec![
1023                Duration::from_secs(1),
1024                Duration::from_secs(5),
1025                Duration::from_secs(30),
1026            ],
1027        };
1028        
1029        assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
1030        assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(5)));
1031        assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(30)));
1032        assert_eq!(strategy.delay_for_attempt(3), None); assert_eq!(strategy.max_attempts(), 3);
1034    }
1035    
1036    #[test]
1037    fn test_scheduled_job_creation() {
1038        let job = TestJob {
1039            message: "Hello, World!".to_string(),
1040        };
1041        
1042        let scheduled = ScheduledJob::new(
1043            "test_schedule".to_string(),
1044            "0 0 0 * * *", job,
1046            Some(Priority::High),
1047            None,
1048        ).unwrap();
1049        
1050        assert_eq!(scheduled.id, "test_schedule");
1051        assert_eq!(scheduled.job_type, "test_job");
1052        assert_eq!(scheduled.priority, Priority::High);
1053        assert!(scheduled.enabled);
1054        assert!(scheduled.next_run.is_some());
1055    }
1056    
1057    #[tokio::test]
1058    async fn test_job_scheduler_basic() {
1059        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1060        let scheduler = JobScheduler::new(backend);
1061        
1062        let job = TestJob {
1063            message: "Scheduled job".to_string(),
1064        };
1065        
1066        let scheduled = ScheduledJob::new(
1067            "test_schedule".to_string(),
1068            "0 * * * * *", job,
1070            Some(Priority::Normal),
1071            None,
1072        ).unwrap();
1073        
1074        scheduler.add_schedule(scheduled).unwrap();
1075        
1076        let schedules = scheduler.list_schedules();
1077        assert_eq!(schedules.len(), 1);
1078        assert_eq!(schedules[0].id, "test_schedule");
1079        
1080        let retrieved = scheduler.get_schedule("test_schedule");
1082        assert!(retrieved.is_some());
1083        assert_eq!(retrieved.unwrap().id, "test_schedule");
1084        
1085        assert!(scheduler.remove_schedule("test_schedule").unwrap());
1087        assert!(scheduler.get_schedule("test_schedule").is_none());
1088    }
1089    
1090    #[tokio::test]
1091    async fn test_cancellation_token() {
1092        let token = CancellationToken::new();
1093        
1094        assert!(!token.is_cancelled());
1096        
1097        token.cancel();
1099        assert!(token.is_cancelled());
1100        
1101        token.wait_for_cancellation().await;
1103        
1104        let cloned = token.clone();
1106        assert!(cloned.is_cancelled());
1107        cloned.wait_for_cancellation().await; }
1109    
1110    #[tokio::test]
1111    async fn test_cancellation_token_async_notification() {
1112        let token = CancellationToken::new();
1113        let token_clone = token.clone();
1114        
1115        let wait_task = tokio::spawn(async move {
1117            token_clone.wait_for_cancellation().await;
1118            "cancelled"
1119        });
1120        
1121        tokio::time::sleep(Duration::from_millis(1)).await;
1123        
1124        assert!(!wait_task.is_finished());
1126        
1127        token.cancel();
1129        
1130        let result = tokio::time::timeout(Duration::from_millis(100), wait_task).await;
1132        assert!(result.is_ok());
1133        assert_eq!(result.unwrap().unwrap(), "cancelled");
1134    }
1135    
1136    #[test]
1137    fn test_job_cancellation_manager() {
1138        let manager = JobCancellationManager::new();
1139        let job_id = crate::JobId::new_v4();
1140        
1141        let token = manager.register_job(job_id);
1143        assert_eq!(manager.active_job_count(), 1);
1144        assert!(manager.active_jobs().contains(&job_id));
1145        
1146        assert!(!token.is_cancelled());
1148        
1149        assert!(manager.cancel_job(job_id));
1151        assert!(token.is_cancelled());
1152        
1153        manager.unregister_job(job_id);
1155        assert_eq!(manager.active_job_count(), 0);
1156        
1157        assert!(!manager.cancel_job(job_id));
1159    }
1160    
1161    #[test]
1162    fn test_job_metrics() {
1163        let mut metrics = JobMetrics::new();
1164        
1165        metrics.record_scheduled("test_job", Priority::High);
1167        metrics.record_scheduled("test_job", Priority::Normal);
1168        metrics.record_scheduled("email_job", Priority::High);
1169        
1170        assert_eq!(metrics.total_scheduled, 3);
1171        assert_eq!(*metrics.jobs_by_type.get("test_job").unwrap(), 2);
1172        assert_eq!(*metrics.jobs_by_type.get("email_job").unwrap(), 1);
1173        assert_eq!(*metrics.jobs_by_priority.get("High").unwrap(), 2);
1174        assert_eq!(*metrics.jobs_by_priority.get("Normal").unwrap(), 1);
1175        
1176        metrics.record_execution_start();
1178        metrics.record_execution_start();
1179        assert_eq!(metrics.total_executed, 2);
1180        
1181        metrics.record_success(100); assert_eq!(metrics.successful_jobs, 1);
1184        assert_eq!(metrics.min_execution_time_ms, 100);
1185        assert_eq!(metrics.max_execution_time_ms, 100);
1186        assert_eq!(metrics.avg_execution_time_ms, 100.0); metrics.record_failure(200, 2); assert_eq!(metrics.failed_jobs, 1);
1191        assert_eq!(metrics.avg_retry_attempts, 2.0);
1192        assert_eq!(metrics.max_execution_time_ms, 200);
1193        assert_eq!(metrics.avg_execution_time_ms, 150.0); assert_eq!(metrics.success_rate, 0.5); }
1198    
1199    #[test]
1200    fn test_job_metrics_collector() {
1201        let collector = JobMetricsCollector::new();
1202        let job_id = crate::JobId::new_v4();
1203        
1204        collector.record_job_scheduled("test_job", Priority::High);
1206        
1207        collector.record_execution_start(job_id);
1209        assert_eq!(collector.active_executions_count(), 1);
1210        
1211        std::thread::sleep(Duration::from_millis(10)); collector.record_job_success(job_id);
1214        assert_eq!(collector.active_executions_count(), 0);
1215        
1216        let metrics = collector.get_metrics();
1217        assert_eq!(metrics.total_scheduled, 1);
1218        assert_eq!(metrics.total_executed, 1);
1219        assert_eq!(metrics.successful_jobs, 1);
1220        assert_eq!(metrics.success_rate, 1.0);
1221        assert!(metrics.min_execution_time_ms >= 10);
1222    }
1223    
1224    #[tokio::test]
1225    async fn test_atomic_clear_dead_jobs() {
1226        use crate::{MemoryBackend, QueueConfig, JobEntry, Priority, QueueBackend};
1227        
1228        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1229        let scheduler = JobScheduler::new(backend.clone());
1230        
1231        let job1 = TestJob { message: "job1".to_string() };
1233        let job2 = TestJob { message: "job2".to_string() };
1234        let job3 = TestJob { message: "job3".to_string() };
1235        
1236        let mut entry1 = JobEntry::new(job1, Some(Priority::Normal), None).unwrap();
1237        let mut entry2 = JobEntry::new(job2, Some(Priority::High), None).unwrap();
1238        let mut entry3 = JobEntry::new(job3, Some(Priority::Low), None).unwrap();
1239        
1240        for _ in 0..=3 {
1242            entry1.mark_failed("Test failure".to_string());
1243            entry2.mark_failed("Test failure".to_string());
1244            entry3.mark_failed("Test failure".to_string());
1245        }
1246        
1247        backend.enqueue(entry1).await.unwrap();
1249        backend.enqueue(entry2).await.unwrap();
1250        backend.enqueue(entry3).await.unwrap();
1251        
1252        let dead_jobs_before = scheduler.get_dead_jobs(None).await.unwrap();
1254        assert_eq!(dead_jobs_before.len(), 3);
1255        
1256        let cleared_count = scheduler.clear_dead_jobs().await.unwrap();
1258        assert_eq!(cleared_count, 3);
1259        
1260        let dead_jobs_after = scheduler.get_dead_jobs(None).await.unwrap();
1262        assert_eq!(dead_jobs_after.len(), 0);
1263        
1264        let stats = backend.stats().await.unwrap();
1266        assert_eq!(stats.dead_jobs, 0);
1267    }
1268    
1269    #[tokio::test]
1270    async fn test_atomic_requeue_dead_job() {
1271        use crate::{MemoryBackend, QueueConfig, JobEntry, JobState, Priority, QueueBackend};
1272        
1273        let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1274        let scheduler = JobScheduler::new(backend.clone());
1275        
1276        let job = TestJob { message: "dead job".to_string() };
1278        let mut entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
1279        let job_id = entry.id();
1280        
1281        for _ in 0..=3 {
1283            entry.mark_failed("Test failure".to_string());
1284        }
1285        backend.enqueue(entry).await.unwrap();
1286        
1287        let stats_before = backend.stats().await.unwrap();
1289        assert_eq!(stats_before.dead_jobs, 1);
1290        assert_eq!(stats_before.pending_jobs, 0);
1291        
1292        let requeued = scheduler.requeue_dead_job(job_id).await.unwrap();
1294        assert!(requeued);
1295        
1296        let stats_after = backend.stats().await.unwrap();
1298        assert_eq!(stats_after.dead_jobs, 0);
1299        assert_eq!(stats_after.pending_jobs, 1);
1300        
1301        let job_entry = backend.get_job(job_id).await.unwrap().unwrap();
1303        assert_eq!(job_entry.state(), &JobState::Pending);
1304        assert_eq!(job_entry.attempts(), 0);
1305    }
1306}