1use std::time::Duration;
10use std::str::FromStr;
11use std::sync::Arc;
12use chrono::{DateTime, Utc};
13use cron::Schedule;
14use serde::{Serialize, Deserialize};
15use thiserror::Error;
16use crate::{Job, JobEntry, Priority, QueueResult, QueueError};
17
18#[derive(Error, Debug)]
20pub enum ScheduleError {
21 #[error("Invalid cron expression: {0}")]
22 InvalidCron(String),
23
24 #[error("Schedule not found: {0}")]
25 ScheduleNotFound(String),
26
27 #[error("Invalid retry configuration: {0}")]
28 InvalidRetryConfig(String),
29
30 #[error("Queue error: {0}")]
31 Queue(#[from] QueueError),
32}
33
34pub type ScheduleResult<T> = Result<T, ScheduleError>;
36
37#[derive(Debug, Clone, Serialize)]
39pub struct CronExpression {
40 expression: String,
41 #[serde(skip)]
42 schedule: Option<Schedule>,
43}
44
45impl CronExpression {
46 pub fn new(expression: &str) -> ScheduleResult<Self> {
54 let schedule = Schedule::from_str(expression)
55 .map_err(|e| ScheduleError::InvalidCron(format!("{}: {}", expression, e)))?;
56
57 Ok(CronExpression {
58 expression: expression.to_string(),
59 schedule: Some(schedule),
60 })
61 }
62
63 pub fn expression(&self) -> &str {
65 &self.expression
66 }
67
68 pub fn next_run_time(&self, after: DateTime<Utc>) -> Option<DateTime<Utc>> {
70 self.schedule.as_ref()?.after(&after).next()
71 }
72
73 pub fn next_run_times(&self, after: DateTime<Utc>, count: usize) -> Vec<DateTime<Utc>> {
75 self.schedule.as_ref()
76 .map(|s| s.after(&after).take(count).collect())
77 .unwrap_or_default()
78 }
79
80 pub fn should_run(&self, at: DateTime<Utc>) -> bool {
82 if let Some(next) = self.next_run_time(at - chrono::Duration::seconds(1)) {
83 (next - at).num_seconds().abs() < 30 } else {
85 false
86 }
87 }
88}
89
90impl<'de> Deserialize<'de> for CronExpression {
92 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
93 where
94 D: serde::Deserializer<'de>,
95 {
96 #[derive(Deserialize)]
97 struct CronExpressionData {
98 expression: String,
99 }
100
101 let data = CronExpressionData::deserialize(deserializer)?;
102 CronExpression::new(&data.expression)
103 .map_err(|e| serde::de::Error::custom(e.to_string()))
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub enum RetryStrategy {
110 Fixed {
112 delay: Duration,
113 max_attempts: u32,
114 },
115 Exponential {
117 initial_delay: Duration,
118 multiplier: f64,
119 max_delay: Duration,
120 max_attempts: u32,
121 jitter: bool,
122 },
123 Linear {
125 initial_delay: Duration,
126 increment: Duration,
127 max_delay: Duration,
128 max_attempts: u32,
129 },
130 Custom {
132 delays: Vec<Duration>,
133 },
134}
135
136impl Default for RetryStrategy {
137 fn default() -> Self {
138 RetryStrategy::Exponential {
139 initial_delay: Duration::from_secs(1),
140 multiplier: 2.0,
141 max_delay: Duration::from_secs(300), max_attempts: 3,
143 jitter: true,
144 }
145 }
146}
147
148impl RetryStrategy {
149 pub fn delay_for_attempt(&self, attempt: u32) -> Option<Duration> {
151 use rand::Rng;
152
153 match self {
154 RetryStrategy::Fixed { delay, max_attempts } => {
155 if attempt < *max_attempts {
156 Some(*delay)
157 } else {
158 None
159 }
160 }
161 RetryStrategy::Exponential {
162 initial_delay,
163 multiplier,
164 max_delay,
165 max_attempts,
166 jitter,
167 } => {
168 if attempt >= *max_attempts {
169 return None;
170 }
171
172 let delay = initial_delay.as_secs_f64() * multiplier.powi(attempt as i32);
173 let delay = delay.min(max_delay.as_secs_f64());
174
175 let delay = if *jitter {
176 let mut rng = rand::thread_rng();
178 let jitter_factor = rng.gen_range(0.75..1.25);
179 delay * jitter_factor
180 } else {
181 delay
182 };
183
184 Some(Duration::from_secs_f64(delay))
185 }
186 RetryStrategy::Linear {
187 initial_delay,
188 increment,
189 max_delay,
190 max_attempts,
191 } => {
192 if attempt >= *max_attempts {
193 return None;
194 }
195
196 let delay = initial_delay.as_secs() + (increment.as_secs() * attempt as u64);
197 let delay = delay.min(max_delay.as_secs());
198 Some(Duration::from_secs(delay))
199 }
200 RetryStrategy::Custom { delays } => {
201 delays.get(attempt as usize).copied()
202 }
203 }
204 }
205
206 pub fn max_attempts(&self) -> u32 {
208 match self {
209 RetryStrategy::Fixed { max_attempts, .. } => *max_attempts,
210 RetryStrategy::Exponential { max_attempts, .. } => *max_attempts,
211 RetryStrategy::Linear { max_attempts, .. } => *max_attempts,
212 RetryStrategy::Custom { delays } => delays.len() as u32,
213 }
214 }
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct ScheduledJob {
220 pub id: String,
222 pub cron: CronExpression,
224 pub job_type: String,
226 pub payload: serde_json::Value,
228 pub priority: Priority,
230 pub retry_strategy: RetryStrategy,
232 pub timeout: Duration,
234 pub enabled: bool,
236 pub description: Option<String>,
238 pub next_run: Option<DateTime<Utc>>,
240 pub last_run: Option<DateTime<Utc>>,
242 pub created_at: DateTime<Utc>,
244}
245
246impl ScheduledJob {
247 pub fn new<T: Job>(
249 id: String,
250 cron_expr: &str,
251 job: T,
252 priority: Option<Priority>,
253 retry_strategy: Option<RetryStrategy>,
254 ) -> ScheduleResult<Self> {
255 let cron = CronExpression::new(cron_expr)?;
256 let now = Utc::now();
257 let next_run = cron.next_run_time(now);
258
259 let job_type = job.job_type().to_string();
260 let timeout = job.timeout();
261 let payload = serde_json::to_value(job)
262 .map_err(|e| ScheduleError::Queue(QueueError::Serialization(e)))?;
263
264 Ok(ScheduledJob {
265 id,
266 cron,
267 job_type,
268 payload,
269 priority: priority.unwrap_or_default(),
270 retry_strategy: retry_strategy.unwrap_or_default(),
271 timeout,
272 enabled: true,
273 description: None,
274 next_run,
275 last_run: None,
276 created_at: now,
277 })
278 }
279
280 pub fn update_next_run(&mut self) {
282 let after = self.last_run.unwrap_or_else(Utc::now);
283 self.next_run = self.cron.next_run_time(after);
284 }
285
286 pub fn should_run(&self) -> bool {
288 if !self.enabled {
289 return false;
290 }
291
292 if let Some(next_run) = self.next_run {
293 next_run <= Utc::now()
294 } else {
295 false
296 }
297 }
298
299 pub fn mark_executed(&mut self) {
301 self.last_run = Some(Utc::now());
302 self.update_next_run();
303 }
304
305 pub fn create_job_entry(&self) -> QueueResult<JobEntry> {
307 JobEntry::new_with_job_type(
308 self.job_type.clone(),
309 self.payload.clone(),
310 Some(self.priority),
311 None, self.retry_strategy.max_attempts(),
313 )
314 }
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
319struct ScheduledJobWrapper {
320 job_type: String,
321 payload: serde_json::Value,
322 max_retries: u32,
323 timeout: Duration,
324}
325
326#[async_trait::async_trait]
327impl Job for ScheduledJobWrapper {
328 async fn execute(&self) -> crate::JobResult<()> {
329 Ok(())
331 }
332
333 fn job_type(&self) -> &'static str {
334 "scheduled_job_wrapper"
337 }
338
339 fn max_retries(&self) -> u32 {
340 self.max_retries
341 }
342
343 fn timeout(&self) -> Duration {
344 self.timeout
345 }
346}
347
348pub struct JobScheduler<B: crate::QueueBackend> {
350 backend: std::sync::Arc<B>,
351 schedules: std::sync::Arc<parking_lot::RwLock<std::collections::HashMap<String, ScheduledJob>>>,
352 running: std::sync::Arc<std::sync::atomic::AtomicBool>,
353}
354
355impl<B: crate::QueueBackend + 'static> JobScheduler<B> {
356 pub fn new(backend: std::sync::Arc<B>) -> Self {
358 Self {
359 backend,
360 schedules: std::sync::Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
361 running: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
362 }
363 }
364
365 pub fn add_schedule(&self, schedule: ScheduledJob) -> ScheduleResult<()> {
367 let mut schedules = self.schedules.write();
368 schedules.insert(schedule.id.clone(), schedule);
369 Ok(())
370 }
371
372 pub fn remove_schedule(&self, id: &str) -> ScheduleResult<bool> {
374 let mut schedules = self.schedules.write();
375 Ok(schedules.remove(id).is_some())
376 }
377
378 pub fn get_schedule(&self, id: &str) -> Option<ScheduledJob> {
380 let schedules = self.schedules.read();
381 schedules.get(id).cloned()
382 }
383
384 pub fn list_schedules(&self) -> Vec<ScheduledJob> {
386 let schedules = self.schedules.read();
387 schedules.values().cloned().collect()
388 }
389
390 pub fn set_schedule_enabled(&self, id: &str, enabled: bool) -> ScheduleResult<bool> {
392 let mut schedules = self.schedules.write();
393 if let Some(schedule) = schedules.get_mut(id) {
394 schedule.enabled = enabled;
395 Ok(true)
396 } else {
397 Ok(false)
398 }
399 }
400
401 pub async fn start(&self) -> ScheduleResult<()> {
403 self.running.store(true, std::sync::atomic::Ordering::SeqCst);
404
405 let backend = self.backend.clone();
406 let schedules = self.schedules.clone();
407 let running = self.running.clone();
408
409 tokio::spawn(async move {
410 let mut interval = tokio::time::interval(Duration::from_secs(30)); while running.load(std::sync::atomic::Ordering::SeqCst) {
413 interval.tick().await;
414
415 let mut due_schedules = Vec::new();
417 {
418 let mut schedules_guard = schedules.write();
419 for schedule in schedules_guard.values_mut() {
420 if schedule.should_run() {
421 schedule.mark_executed();
422 due_schedules.push(schedule.clone());
423 }
424 }
425 }
426
427 for schedule in due_schedules {
429 if let Ok(job_entry) = schedule.create_job_entry() {
430 if let Err(e) = backend.enqueue(job_entry).await {
431 tracing::error!("Failed to enqueue scheduled job {}: {}", schedule.id, e);
432 } else {
433 tracing::info!("Enqueued scheduled job: {}", schedule.id);
434 }
435 }
436 }
437 }
438 });
439
440 Ok(())
441 }
442
443 pub fn stop(&self) {
445 self.running.store(false, std::sync::atomic::Ordering::SeqCst);
446 }
447
448 pub fn is_running(&self) -> bool {
450 self.running.load(std::sync::atomic::Ordering::SeqCst)
451 }
452
453 pub async fn get_dead_jobs(&self, limit: Option<usize>) -> QueueResult<Vec<crate::JobEntry>> {
455 self.backend.get_jobs_by_state(crate::JobState::Dead, limit).await
456 }
457
458 pub async fn requeue_dead_job(&self, job_id: crate::JobId) -> QueueResult<bool> {
460 if let Some(job) = self.backend.get_job(job_id).await? {
461 if job.state() == &crate::JobState::Dead {
462 self.backend.requeue_job(job_id, job).await
464 } else {
465 Ok(false)
466 }
467 } else {
468 Ok(false)
469 }
470 }
471
472 pub async fn clear_dead_jobs(&self) -> QueueResult<u64> {
474 self.backend.clear_jobs_by_state(crate::JobState::Dead).await
476 }
477}
478
479
480pub mod cron_presets {
482 use super::CronExpression;
483
484 pub fn every_minute() -> CronExpression {
486 CronExpression::new("0 * * * * *").expect("Invalid 'every_minute' cron preset")
487 }
488
489 pub fn every_5_minutes() -> CronExpression {
491 CronExpression::new("0 */5 * * * *").unwrap()
492 }
493
494 pub fn every_15_minutes() -> CronExpression {
496 CronExpression::new("0 */15 * * * *").unwrap()
497 }
498
499 pub fn every_30_minutes() -> CronExpression {
501 CronExpression::new("0 */30 * * * *").unwrap()
502 }
503
504 pub fn hourly() -> CronExpression {
506 CronExpression::new("0 0 * * * *").unwrap()
507 }
508
509 pub fn daily() -> CronExpression {
511 CronExpression::new("0 0 0 * * *").unwrap()
512 }
513
514 pub fn weekly() -> CronExpression {
516 CronExpression::new("0 0 0 * * SUN").unwrap()
517 }
518
519 pub fn monthly() -> CronExpression {
521 CronExpression::new("0 0 0 1 * *").unwrap()
522 }
523
524 pub fn weekdays_at_9am() -> CronExpression {
526 CronExpression::new("0 0 9 * * 1-5").unwrap()
527 }
528
529 pub fn custom(expression: &str) -> Result<CronExpression, super::ScheduleError> {
531 CronExpression::new(expression)
532 }
533}
534
535#[derive(Debug, Clone)]
537pub struct CancellationToken {
538 cancelled: Arc<std::sync::atomic::AtomicBool>,
539 notify: Arc<tokio::sync::Notify>,
540}
541
542impl CancellationToken {
543 pub fn new() -> Self {
545 Self {
546 cancelled: Arc::new(std::sync::atomic::AtomicBool::new(false)),
547 notify: Arc::new(tokio::sync::Notify::new()),
548 }
549 }
550
551 pub fn cancel(&self) {
553 self.cancelled.store(true, std::sync::atomic::Ordering::SeqCst);
554 self.notify.notify_waiters();
555 }
556
557 pub fn is_cancelled(&self) -> bool {
559 self.cancelled.load(std::sync::atomic::Ordering::SeqCst)
560 }
561
562 pub async fn wait_for_cancellation(&self) {
564 if self.is_cancelled() {
565 return;
566 }
567
568 self.notify.notified().await;
569 }
570
571 pub async fn cancelled(&self) {
573 self.wait_for_cancellation().await;
574 }
575}
576
577impl Default for CancellationToken {
578 fn default() -> Self {
579 Self::new()
580 }
581}
582
583#[derive(Debug)]
585pub struct JobCancellationManager {
586 active_tokens: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, CancellationToken>>>,
587}
588
589impl JobCancellationManager {
590 pub fn new() -> Self {
592 Self {
593 active_tokens: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
594 }
595 }
596
597 pub fn register_job(&self, job_id: crate::JobId) -> CancellationToken {
599 let token = CancellationToken::new();
600 self.active_tokens.write().insert(job_id, token.clone());
601 token
602 }
603
604 pub fn cancel_job(&self, job_id: crate::JobId) -> bool {
606 if let Some(token) = self.active_tokens.read().get(&job_id) {
607 token.cancel();
608 true
609 } else {
610 false
611 }
612 }
613
614 pub fn cancel_all(&self) {
616 let tokens = self.active_tokens.read();
617 for token in tokens.values() {
618 token.cancel();
619 }
620 }
621
622 pub fn unregister_job(&self, job_id: crate::JobId) {
624 self.active_tokens.write().remove(&job_id);
625 }
626
627 pub fn active_job_count(&self) -> usize {
629 self.active_tokens.read().len()
630 }
631
632 pub fn active_jobs(&self) -> Vec<crate::JobId> {
634 self.active_tokens.read().keys().cloned().collect()
635 }
636}
637
638impl Default for JobCancellationManager {
639 fn default() -> Self {
640 Self::new()
641 }
642}
643
644#[async_trait::async_trait]
646pub trait CancellableJob: Job {
647 async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()>;
649}
650
651#[derive(Debug, Clone, Default, Serialize, Deserialize)]
653pub struct JobMetrics {
654 pub total_scheduled: u64,
656 pub total_executed: u64,
658 pub successful_jobs: u64,
660 pub failed_jobs: u64,
662 pub retried_jobs: u64,
664 pub timeout_jobs: u64,
666 pub cancelled_jobs: u64,
668 pub avg_execution_time_ms: f64,
670 pub min_execution_time_ms: u64,
672 pub max_execution_time_ms: u64,
674 pub jobs_by_priority: std::collections::HashMap<String, u64>,
676 pub jobs_by_type: std::collections::HashMap<String, u64>,
678 pub success_rate: f64,
680 pub avg_retry_attempts: f64,
682 pub last_reset: DateTime<Utc>,
684}
685
686impl JobMetrics {
687 pub fn new() -> Self {
689 Self {
690 last_reset: Utc::now(),
691 ..Default::default()
692 }
693 }
694
695 pub fn record_scheduled(&mut self, job_type: &str, priority: Priority) {
697 self.total_scheduled += 1;
698 *self.jobs_by_type.entry(job_type.to_string()).or_insert(0) += 1;
699 *self.jobs_by_priority.entry(format!("{:?}", priority)).or_insert(0) += 1;
700 }
701
702 pub fn record_execution_start(&mut self) {
704 self.total_executed += 1;
705 }
706
707 pub fn record_success(&mut self, execution_time_ms: u64) {
709 self.successful_jobs += 1;
710 self.update_execution_time(execution_time_ms);
711 self.update_success_rate();
712 }
713
714 pub fn record_failure(&mut self, execution_time_ms: u64, retry_attempts: u32) {
716 self.failed_jobs += 1;
717 self.update_execution_time(execution_time_ms);
718 self.update_success_rate();
719 self.update_retry_attempts(retry_attempts);
720 }
721
722 pub fn record_retry(&mut self) {
724 self.retried_jobs += 1;
725 }
726
727 pub fn record_timeout(&mut self, execution_time_ms: u64) {
729 self.timeout_jobs += 1;
730 self.update_execution_time(execution_time_ms);
731 }
732
733 pub fn record_cancellation(&mut self, execution_time_ms: u64) {
735 self.cancelled_jobs += 1;
736 self.update_execution_time(execution_time_ms);
737 }
738
739 pub fn reset(&mut self) {
741 *self = Self::new();
742 }
743
744 fn update_execution_time(&mut self, execution_time_ms: u64) {
746 if self.min_execution_time_ms == 0 || execution_time_ms < self.min_execution_time_ms {
747 self.min_execution_time_ms = execution_time_ms;
748 }
749 if execution_time_ms > self.max_execution_time_ms {
750 self.max_execution_time_ms = execution_time_ms;
751 }
752
753 let completed_jobs = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
755 if completed_jobs > 0 {
756 let new_sample = execution_time_ms as f64;
757 self.avg_execution_time_ms += (new_sample - self.avg_execution_time_ms) / completed_jobs as f64;
759 }
760 }
761
762 fn update_success_rate(&mut self) {
764 let total_completed = self.successful_jobs + self.failed_jobs + self.timeout_jobs + self.cancelled_jobs;
765 if total_completed > 0 {
766 self.success_rate = self.successful_jobs as f64 / total_completed as f64;
767 }
768 }
769
770 fn update_retry_attempts(&mut self, attempts: u32) {
772 if self.failed_jobs > 0 {
773 let new_sample = attempts as f64;
774 self.avg_retry_attempts += (new_sample - self.avg_retry_attempts) / self.failed_jobs as f64;
776 }
777 }
778}
779
780#[derive(Debug)]
782pub struct JobMetricsCollector {
783 metrics: Arc<parking_lot::RwLock<JobMetrics>>,
784 active_executions: Arc<parking_lot::RwLock<std::collections::HashMap<crate::JobId, std::time::Instant>>>,
785}
786
787impl JobMetricsCollector {
788 pub fn new() -> Self {
790 Self {
791 metrics: Arc::new(parking_lot::RwLock::new(JobMetrics::new())),
792 active_executions: Arc::new(parking_lot::RwLock::new(std::collections::HashMap::new())),
793 }
794 }
795
796 pub fn record_job_scheduled(&self, job_type: &str, priority: Priority) {
798 let mut metrics = self.metrics.write();
799 metrics.record_scheduled(job_type, priority);
800 }
801
802 pub fn record_execution_start(&self, job_id: crate::JobId) {
804 let mut metrics = self.metrics.write();
805 let mut executions = self.active_executions.write();
806
807 metrics.record_execution_start();
808 executions.insert(job_id, std::time::Instant::now());
809 }
810
811 pub fn record_job_success(&self, job_id: crate::JobId) {
813 let execution_time = self.get_and_remove_execution_time(job_id);
814 let mut metrics = self.metrics.write();
815 metrics.record_success(execution_time);
816 }
817
818 pub fn record_job_failure(&self, job_id: crate::JobId, retry_attempts: u32) {
820 let execution_time = self.get_and_remove_execution_time(job_id);
821 let mut metrics = self.metrics.write();
822 metrics.record_failure(execution_time, retry_attempts);
823 }
824
825 pub fn record_job_retry(&self, _job_id: crate::JobId) {
827 let mut metrics = self.metrics.write();
829 metrics.record_retry();
830 }
831
832 pub fn record_job_timeout(&self, job_id: crate::JobId) {
834 let execution_time = self.get_and_remove_execution_time(job_id);
835 let mut metrics = self.metrics.write();
836 metrics.record_timeout(execution_time);
837 }
838
839 pub fn record_job_cancellation(&self, job_id: crate::JobId) {
841 let execution_time = self.get_and_remove_execution_time(job_id);
842 let mut metrics = self.metrics.write();
843 metrics.record_cancellation(execution_time);
844 }
845
846 pub fn get_metrics(&self) -> JobMetrics {
848 self.metrics.read().clone()
849 }
850
851 pub fn reset_metrics(&self) {
853 let mut metrics = self.metrics.write();
854 let mut executions = self.active_executions.write();
855
856 metrics.reset();
857 executions.clear();
858 }
859
860 fn get_and_remove_execution_time(&self, job_id: crate::JobId) -> u64 {
862 let mut executions = self.active_executions.write();
863 if let Some(start_time) = executions.remove(&job_id) {
864 start_time.elapsed().as_millis() as u64
865 } else {
866 0
867 }
868 }
869
870 pub fn active_executions_count(&self) -> usize {
872 self.active_executions.read().len()
873 }
874}
875
876impl Default for JobMetricsCollector {
877 fn default() -> Self {
878 Self::new()
879 }
880}
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885 use std::time::Duration;
886 use crate::{MemoryBackend, QueueConfig};
887
888 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
889 struct TestJob {
890 message: String,
891 }
892
893 #[async_trait::async_trait]
894 impl Job for TestJob {
895 async fn execute(&self) -> crate::JobResult<()> {
896 println!("Executing test job: {}", self.message);
897 Ok(())
898 }
899
900 fn job_type(&self) -> &'static str {
901 "test_job"
902 }
903 }
904
905 #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
906 struct CancellableTestJob {
907 message: String,
908 sleep_duration: Duration,
909 }
910
911 #[async_trait::async_trait]
912 impl Job for CancellableTestJob {
913 async fn execute(&self) -> crate::JobResult<()> {
914 tokio::time::sleep(self.sleep_duration).await;
915 Ok(())
916 }
917
918 fn job_type(&self) -> &'static str {
919 "cancellable_test_job"
920 }
921 }
922
923 #[async_trait::async_trait]
924 impl CancellableJob for CancellableTestJob {
925 async fn execute_with_cancellation(&self, token: &CancellationToken) -> crate::JobResult<()> {
926 tokio::select! {
927 _ = tokio::time::sleep(self.sleep_duration) => {
928 println!("Job completed: {}", self.message);
929 Ok(())
930 }
931 _ = token.cancelled() => {
932 println!("Job cancelled: {}", self.message);
933 Err("Job was cancelled".into())
934 }
935 }
936 }
937 }
938
939 #[test]
940 fn test_cron_expression_validation() {
941 assert!(CronExpression::new("0 0 0 * * *").is_ok()); assert!(CronExpression::new("0 */5 * * * *").is_ok()); assert!(CronExpression::new("0 0 9-17 * * 1-5").is_ok()); assert!(CronExpression::new("invalid").is_err());
948 assert!(CronExpression::new("* * * * *").is_err()); }
950
951 #[test]
952 fn test_cron_next_run_time() {
953 let cron = CronExpression::new("0 0 0 * * *").unwrap(); let now = Utc::now();
955 let next = cron.next_run_time(now);
956
957 assert!(next.is_some());
958 assert!(next.unwrap() > now);
959 }
960
961 #[test]
962 fn test_cron_presets() {
963 assert!(cron_presets::every_minute().next_run_time(Utc::now()).is_some());
965 assert!(cron_presets::hourly().next_run_time(Utc::now()).is_some());
966 assert!(cron_presets::daily().next_run_time(Utc::now()).is_some());
967 assert!(cron_presets::weekly().next_run_time(Utc::now()).is_some());
968 assert!(cron_presets::monthly().next_run_time(Utc::now()).is_some());
969 assert!(cron_presets::weekdays_at_9am().next_run_time(Utc::now()).is_some());
970 }
971
972 #[test]
973 fn test_retry_strategy_exponential() {
974 let strategy = RetryStrategy::Exponential {
975 initial_delay: Duration::from_secs(1),
976 multiplier: 2.0,
977 max_delay: Duration::from_secs(60),
978 max_attempts: 3,
979 jitter: false,
980 };
981
982 assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
983 assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(2)));
984 assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(4)));
985 assert_eq!(strategy.delay_for_attempt(3), None); assert_eq!(strategy.max_attempts(), 3);
987 }
988
989 #[test]
990 fn test_retry_strategy_linear() {
991 let strategy = RetryStrategy::Linear {
992 initial_delay: Duration::from_secs(5),
993 increment: Duration::from_secs(10),
994 max_delay: Duration::from_secs(60),
995 max_attempts: 4,
996 };
997
998 assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(5)));
999 assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(15)));
1000 assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(25)));
1001 assert_eq!(strategy.delay_for_attempt(3), Some(Duration::from_secs(35)));
1002 assert_eq!(strategy.delay_for_attempt(4), None); assert_eq!(strategy.max_attempts(), 4);
1004 }
1005
1006 #[test]
1007 fn test_retry_strategy_fixed() {
1008 let strategy = RetryStrategy::Fixed {
1009 delay: Duration::from_secs(10),
1010 max_attempts: 2,
1011 };
1012
1013 assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(10)));
1014 assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(10)));
1015 assert_eq!(strategy.delay_for_attempt(2), None); assert_eq!(strategy.max_attempts(), 2);
1017 }
1018
1019 #[test]
1020 fn test_retry_strategy_custom() {
1021 let strategy = RetryStrategy::Custom {
1022 delays: vec![
1023 Duration::from_secs(1),
1024 Duration::from_secs(5),
1025 Duration::from_secs(30),
1026 ],
1027 };
1028
1029 assert_eq!(strategy.delay_for_attempt(0), Some(Duration::from_secs(1)));
1030 assert_eq!(strategy.delay_for_attempt(1), Some(Duration::from_secs(5)));
1031 assert_eq!(strategy.delay_for_attempt(2), Some(Duration::from_secs(30)));
1032 assert_eq!(strategy.delay_for_attempt(3), None); assert_eq!(strategy.max_attempts(), 3);
1034 }
1035
1036 #[test]
1037 fn test_scheduled_job_creation() {
1038 let job = TestJob {
1039 message: "Hello, World!".to_string(),
1040 };
1041
1042 let scheduled = ScheduledJob::new(
1043 "test_schedule".to_string(),
1044 "0 0 0 * * *", job,
1046 Some(Priority::High),
1047 None,
1048 ).unwrap();
1049
1050 assert_eq!(scheduled.id, "test_schedule");
1051 assert_eq!(scheduled.job_type, "test_job");
1052 assert_eq!(scheduled.priority, Priority::High);
1053 assert!(scheduled.enabled);
1054 assert!(scheduled.next_run.is_some());
1055 }
1056
1057 #[tokio::test]
1058 async fn test_job_scheduler_basic() {
1059 let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1060 let scheduler = JobScheduler::new(backend);
1061
1062 let job = TestJob {
1063 message: "Scheduled job".to_string(),
1064 };
1065
1066 let scheduled = ScheduledJob::new(
1067 "test_schedule".to_string(),
1068 "0 * * * * *", job,
1070 Some(Priority::Normal),
1071 None,
1072 ).unwrap();
1073
1074 scheduler.add_schedule(scheduled).unwrap();
1075
1076 let schedules = scheduler.list_schedules();
1077 assert_eq!(schedules.len(), 1);
1078 assert_eq!(schedules[0].id, "test_schedule");
1079
1080 let retrieved = scheduler.get_schedule("test_schedule");
1082 assert!(retrieved.is_some());
1083 assert_eq!(retrieved.unwrap().id, "test_schedule");
1084
1085 assert!(scheduler.remove_schedule("test_schedule").unwrap());
1087 assert!(scheduler.get_schedule("test_schedule").is_none());
1088 }
1089
1090 #[tokio::test]
1091 async fn test_cancellation_token() {
1092 let token = CancellationToken::new();
1093
1094 assert!(!token.is_cancelled());
1096
1097 token.cancel();
1099 assert!(token.is_cancelled());
1100
1101 token.wait_for_cancellation().await;
1103
1104 let cloned = token.clone();
1106 assert!(cloned.is_cancelled());
1107 cloned.wait_for_cancellation().await; }
1109
1110 #[tokio::test]
1111 async fn test_cancellation_token_async_notification() {
1112 let token = CancellationToken::new();
1113 let token_clone = token.clone();
1114
1115 let wait_task = tokio::spawn(async move {
1117 token_clone.wait_for_cancellation().await;
1118 "cancelled"
1119 });
1120
1121 tokio::time::sleep(Duration::from_millis(1)).await;
1123
1124 assert!(!wait_task.is_finished());
1126
1127 token.cancel();
1129
1130 let result = tokio::time::timeout(Duration::from_millis(100), wait_task).await;
1132 assert!(result.is_ok());
1133 assert_eq!(result.unwrap().unwrap(), "cancelled");
1134 }
1135
1136 #[test]
1137 fn test_job_cancellation_manager() {
1138 let manager = JobCancellationManager::new();
1139 let job_id = crate::JobId::new_v4();
1140
1141 let token = manager.register_job(job_id);
1143 assert_eq!(manager.active_job_count(), 1);
1144 assert!(manager.active_jobs().contains(&job_id));
1145
1146 assert!(!token.is_cancelled());
1148
1149 assert!(manager.cancel_job(job_id));
1151 assert!(token.is_cancelled());
1152
1153 manager.unregister_job(job_id);
1155 assert_eq!(manager.active_job_count(), 0);
1156
1157 assert!(!manager.cancel_job(job_id));
1159 }
1160
1161 #[test]
1162 fn test_job_metrics() {
1163 let mut metrics = JobMetrics::new();
1164
1165 metrics.record_scheduled("test_job", Priority::High);
1167 metrics.record_scheduled("test_job", Priority::Normal);
1168 metrics.record_scheduled("email_job", Priority::High);
1169
1170 assert_eq!(metrics.total_scheduled, 3);
1171 assert_eq!(*metrics.jobs_by_type.get("test_job").unwrap(), 2);
1172 assert_eq!(*metrics.jobs_by_type.get("email_job").unwrap(), 1);
1173 assert_eq!(*metrics.jobs_by_priority.get("High").unwrap(), 2);
1174 assert_eq!(*metrics.jobs_by_priority.get("Normal").unwrap(), 1);
1175
1176 metrics.record_execution_start();
1178 metrics.record_execution_start();
1179 assert_eq!(metrics.total_executed, 2);
1180
1181 metrics.record_success(100); assert_eq!(metrics.successful_jobs, 1);
1184 assert_eq!(metrics.min_execution_time_ms, 100);
1185 assert_eq!(metrics.max_execution_time_ms, 100);
1186 assert_eq!(metrics.avg_execution_time_ms, 100.0); metrics.record_failure(200, 2); assert_eq!(metrics.failed_jobs, 1);
1191 assert_eq!(metrics.avg_retry_attempts, 2.0);
1192 assert_eq!(metrics.max_execution_time_ms, 200);
1193 assert_eq!(metrics.avg_execution_time_ms, 150.0); assert_eq!(metrics.success_rate, 0.5); }
1198
1199 #[test]
1200 fn test_job_metrics_collector() {
1201 let collector = JobMetricsCollector::new();
1202 let job_id = crate::JobId::new_v4();
1203
1204 collector.record_job_scheduled("test_job", Priority::High);
1206
1207 collector.record_execution_start(job_id);
1209 assert_eq!(collector.active_executions_count(), 1);
1210
1211 std::thread::sleep(Duration::from_millis(10)); collector.record_job_success(job_id);
1214 assert_eq!(collector.active_executions_count(), 0);
1215
1216 let metrics = collector.get_metrics();
1217 assert_eq!(metrics.total_scheduled, 1);
1218 assert_eq!(metrics.total_executed, 1);
1219 assert_eq!(metrics.successful_jobs, 1);
1220 assert_eq!(metrics.success_rate, 1.0);
1221 assert!(metrics.min_execution_time_ms >= 10);
1222 }
1223
1224 #[tokio::test]
1225 async fn test_atomic_clear_dead_jobs() {
1226 use crate::{MemoryBackend, QueueConfig, JobEntry, Priority, QueueBackend};
1227
1228 let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1229 let scheduler = JobScheduler::new(backend.clone());
1230
1231 let job1 = TestJob { message: "job1".to_string() };
1233 let job2 = TestJob { message: "job2".to_string() };
1234 let job3 = TestJob { message: "job3".to_string() };
1235
1236 let mut entry1 = JobEntry::new(job1, Some(Priority::Normal), None).unwrap();
1237 let mut entry2 = JobEntry::new(job2, Some(Priority::High), None).unwrap();
1238 let mut entry3 = JobEntry::new(job3, Some(Priority::Low), None).unwrap();
1239
1240 for _ in 0..=3 {
1242 entry1.mark_failed("Test failure".to_string());
1243 entry2.mark_failed("Test failure".to_string());
1244 entry3.mark_failed("Test failure".to_string());
1245 }
1246
1247 backend.enqueue(entry1).await.unwrap();
1249 backend.enqueue(entry2).await.unwrap();
1250 backend.enqueue(entry3).await.unwrap();
1251
1252 let dead_jobs_before = scheduler.get_dead_jobs(None).await.unwrap();
1254 assert_eq!(dead_jobs_before.len(), 3);
1255
1256 let cleared_count = scheduler.clear_dead_jobs().await.unwrap();
1258 assert_eq!(cleared_count, 3);
1259
1260 let dead_jobs_after = scheduler.get_dead_jobs(None).await.unwrap();
1262 assert_eq!(dead_jobs_after.len(), 0);
1263
1264 let stats = backend.stats().await.unwrap();
1266 assert_eq!(stats.dead_jobs, 0);
1267 }
1268
1269 #[tokio::test]
1270 async fn test_atomic_requeue_dead_job() {
1271 use crate::{MemoryBackend, QueueConfig, JobEntry, JobState, Priority, QueueBackend};
1272
1273 let backend = std::sync::Arc::new(MemoryBackend::new(QueueConfig::default()));
1274 let scheduler = JobScheduler::new(backend.clone());
1275
1276 let job = TestJob { message: "dead job".to_string() };
1278 let mut entry = JobEntry::new(job, Some(Priority::Normal), None).unwrap();
1279 let job_id = entry.id();
1280
1281 for _ in 0..=3 {
1283 entry.mark_failed("Test failure".to_string());
1284 }
1285 backend.enqueue(entry).await.unwrap();
1286
1287 let stats_before = backend.stats().await.unwrap();
1289 assert_eq!(stats_before.dead_jobs, 1);
1290 assert_eq!(stats_before.pending_jobs, 0);
1291
1292 let requeued = scheduler.requeue_dead_job(job_id).await.unwrap();
1294 assert!(requeued);
1295
1296 let stats_after = backend.stats().await.unwrap();
1298 assert_eq!(stats_after.dead_jobs, 0);
1299 assert_eq!(stats_after.pending_jobs, 1);
1300
1301 let job_entry = backend.get_job(job_id).await.unwrap().unwrap();
1303 assert_eq!(job_entry.state(), &JobState::Pending);
1304 assert_eq!(job_entry.attempts(), 0);
1305 }
1306}