use crate::error::CronResult;
use crate::expression::CronExpression;
use chrono::{DateTime, Utc};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub type JobFn =
Arc<dyn Fn(JobContext) -> Pin<Box<dyn Future<Output = CronResult<()>> + Send>> + Send + Sync>;
#[derive(Debug, Clone)]
pub struct JobContext {
pub name: String,
pub scheduled_time: DateTime<Utc>,
pub execution_time: DateTime<Utc>,
pub execution_count: u64,
}
impl JobContext {
pub fn new(name: String, scheduled_time: DateTime<Utc>, execution_count: u64) -> Self {
Self {
name,
scheduled_time,
execution_time: Utc::now(),
execution_count,
}
}
pub fn delay(&self) -> chrono::Duration {
self.execution_time - self.scheduled_time
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JobStatus {
Scheduled,
Running,
Completed,
Failed(String),
}
pub struct Job {
pub name: String,
pub expression: CronExpression,
pub function: JobFn,
pub status: JobStatus,
pub next_run: Option<DateTime<Utc>>,
pub last_run: Option<DateTime<Utc>>,
pub execution_count: u64,
pub enabled: bool,
pub prevent_overlap: bool,
pub metadata: std::collections::HashMap<String, String>,
}
impl Job {
pub fn new<F, Fut>(name: impl Into<String>, expression: CronExpression, function: F) -> Self
where
F: Fn(JobContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = CronResult<()>> + Send + 'static,
{
let name = name.into();
let next_run = expression.next();
let wrapped_fn = Arc::new(
move |ctx: JobContext| -> Pin<Box<dyn Future<Output = CronResult<()>> + Send>> {
Box::pin(function(ctx))
},
);
Self {
name,
expression,
function: wrapped_fn,
status: JobStatus::Scheduled,
next_run,
last_run: None,
execution_count: 0,
enabled: true,
prevent_overlap: true,
metadata: std::collections::HashMap::new(),
}
}
pub fn should_run(&self) -> bool {
if !self.enabled {
return false;
}
if self.prevent_overlap && self.status == JobStatus::Running {
return false;
}
if let Some(next_run) = self.next_run {
Utc::now() >= next_run
} else {
false
}
}
pub async fn execute(&mut self) -> CronResult<()> {
if !self.enabled {
return Ok(());
}
if self.prevent_overlap && self.status == JobStatus::Running {
return Ok(());
}
self.status = JobStatus::Running;
let context = JobContext::new(
self.name.clone(),
self.next_run.unwrap_or_else(Utc::now),
self.execution_count,
);
let result = (self.function)(context).await;
self.last_run = Some(Utc::now());
self.execution_count += 1;
match result {
Ok(()) => {
self.status = JobStatus::Completed;
self.next_run = self.expression.next();
Ok(())
}
Err(e) => {
self.status = JobStatus::Failed(e.to_string());
self.next_run = self.expression.next();
Err(e)
}
}
}
pub fn enable(&mut self) {
self.enabled = true;
if self.next_run.is_none() {
self.next_run = self.expression.next();
}
}
pub fn disable(&mut self) {
self.enabled = false;
}
pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.metadata.insert(key.into(), value.into());
}
pub fn get_metadata(&self, key: &str) -> Option<&String> {
self.metadata.get(key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CronExpression;
use crate::error::CronError;
#[tokio::test]
async fn test_job_creation() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let job = Job::new("test", expr, |_ctx| async { Ok(()) });
assert_eq!(job.name, "test");
assert_eq!(job.execution_count, 0);
assert!(job.enabled);
}
#[tokio::test]
async fn test_job_execution() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("test", expr, |_ctx| async { Ok(()) });
job.next_run = Some(Utc::now());
let result = job.execute().await;
assert!(result.is_ok());
assert_eq!(job.execution_count, 1);
assert_eq!(job.status, JobStatus::Completed);
}
#[tokio::test]
async fn test_job_failure() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("test", expr, |_ctx| async {
Err(crate::CronError::ExecutionFailed("test error".to_string()))
});
job.next_run = Some(Utc::now());
let result = job.execute().await;
assert!(result.is_err());
assert!(matches!(job.status, JobStatus::Failed(_)));
}
#[test]
fn test_job_enable_disable() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("test", expr, |_ctx| async { Ok(()) });
assert!(job.enabled);
job.disable();
assert!(!job.enabled);
job.enable();
assert!(job.enabled);
}
#[tokio::test]
async fn test_job_initial_execution_count() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let job = Job::new("test", expr, |_ctx| async { Ok(()) });
assert_eq!(job.execution_count, 0);
}
#[tokio::test]
async fn test_job_execution_increments_count() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("counter", expr, |_ctx| async { Ok(()) });
let initial_count = job.execution_count;
let _ = job.execute().await;
let new_count = job.execution_count;
assert_eq!(new_count, initial_count + 1);
}
#[tokio::test]
async fn test_job_success_status() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("success", expr, |_ctx| async { Ok(()) });
let _ = job.execute().await;
assert!(matches!(job.status, JobStatus::Completed));
}
#[tokio::test]
async fn test_job_failure_status() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("failure", expr, |_ctx| async {
Err(CronError::ExecutionFailed("test error".to_string()))
});
let _ = job.execute().await;
assert!(matches!(job.status, JobStatus::Failed(_)));
}
#[tokio::test]
async fn test_job_multiple_executions() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("multi", expr, |_ctx| async { Ok(()) });
for _ in 0..3 {
let _ = job.execute().await;
}
assert_eq!(job.execution_count, 3);
}
#[tokio::test]
async fn test_job_status_before_execution() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let job = Job::new("test", expr, |_ctx| async { Ok(()) });
assert!(matches!(job.status, JobStatus::Scheduled));
}
#[test]
fn test_job_creation_with_different_schedules() {
let schedules = vec![
"0 * * * * *", "0 0 * * * *", "0 0 0 * * *", ];
for schedule in schedules {
let expr = CronExpression::parse(schedule).unwrap();
let job = Job::new("test", expr, |_ctx| async { Ok(()) });
assert_eq!(job.name, "test");
}
}
#[tokio::test]
async fn test_job_context_data() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("ctx_test", expr, |ctx| async move {
assert_eq!(ctx.name, "ctx_test");
Ok(())
});
let result = job.execute().await;
assert!(result.is_ok());
}
#[test]
fn test_job_name_consistency() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let job = Job::new("consistent", expr, |_ctx| async { Ok(()) });
assert_eq!(job.name, "consistent");
assert_eq!(job.name, "consistent"); }
#[tokio::test]
async fn test_job_disabled_flag() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let mut job = Job::new("disabled", expr, |_ctx| async { Ok(()) });
job.disable();
assert!(!job.enabled);
}
#[tokio::test]
async fn test_job_mixed_results() {
let expr = CronExpression::parse("0 * * * * *").unwrap();
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
let counter_clone = counter.clone();
let mut job = Job::new("mixed", expr, move |_ctx| {
let c = counter_clone.clone();
async move {
let count = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count.is_multiple_of(2) {
Ok(())
} else {
Err(CronError::ExecutionFailed("odd execution".to_string()))
}
}
});
for _ in 0..4 {
let _ = job.execute().await;
}
assert_eq!(job.execution_count, 4);
}
}