use chrono::{DateTime, Duration, Utc};
use std::future::Future;
use std::sync::Arc;
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use crate::core::{Job, JobState};
use crate::error::{QmlError, Result};
use crate::storage::prelude::*;
use crate::storage::{Storage, StorageError};
const DEFAULT_SCHEDULER_BATCH_SIZE: usize = 1000;
pub struct JobScheduler {
storage: Arc<dyn Storage>,
poll_interval: Duration,
batch_size: usize,
}
impl JobScheduler {
pub fn new(storage: Arc<dyn Storage>) -> Self {
Self {
storage,
poll_interval: Duration::seconds(30), batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
}
}
pub fn with_poll_interval(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
Self {
storage,
poll_interval,
batch_size: DEFAULT_SCHEDULER_BATCH_SIZE,
}
}
pub async fn run(&self) -> Result<()> {
self.run_until_cancelled(CancellationToken::new()).await
}
pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
info!(
"Starting job scheduler with poll interval: {:?}",
self.poll_interval
);
let mut interval =
interval(
self.poll_interval
.to_std()
.map_err(|e| QmlError::ConfigurationError {
message: format!("Invalid poll interval: {}", e),
})?,
);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!("Scheduler loop exiting on cancellation");
return Ok(());
}
_ = interval.tick() => {}
}
if let Err(e) = self
.process_due_jobs("scheduled", |storage, now, limit| async move {
storage.claim_due_scheduled_jobs(now, limit).await
})
.await
{
error!("Error processing scheduled jobs: {}", e);
}
if let Err(e) = self
.process_due_jobs("retry", |storage, now, limit| async move {
storage.claim_due_retry_jobs(now, limit).await
})
.await
{
error!("Error processing retry jobs: {}", e);
}
}
}
async fn process_due_jobs<F, Fut>(&self, kind: &str, claim: F) -> Result<()>
where
F: FnOnce(Arc<dyn Storage>, DateTime<Utc>, usize) -> Fut,
Fut: Future<Output = std::result::Result<Vec<Job>, StorageError>>,
{
debug!("Checking for {} jobs ready for execution", kind);
let now = Utc::now();
let claimed_jobs = claim(self.storage.clone(), now, self.batch_size)
.await
.map_err(|e| QmlError::StorageError {
message: format!("Failed to claim due {} jobs: {}", kind, e),
})?;
if !claimed_jobs.is_empty() {
info!(
"Promoted {} {} job(s) to Enqueued",
claimed_jobs.len(),
kind
);
}
Ok(())
}
pub async fn schedule_job(
&self,
mut job: Job,
execute_at: DateTime<Utc>,
reason: impl Into<String>,
) -> Result<()> {
let scheduled_state = JobState::scheduled(execute_at, reason);
job.set_state(scheduled_state)?;
self.storage
.enqueue(&job)
.await
.map_err(|e| QmlError::StorageError {
message: format!("Failed to schedule job: {}", e),
})?;
info!("Scheduled job {} for execution at {}", job.id, execute_at);
Ok(())
}
pub async fn schedule_job_in(
&self,
job: Job,
delay: Duration,
reason: impl Into<String>,
) -> Result<()> {
let execute_at = Utc::now() + delay;
self.schedule_job(job, execute_at, reason).await
}
pub fn poll_interval(&self) -> Duration {
self.poll_interval
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::{MemoryStorage, MonitoringApi};
#[tokio::test]
async fn test_schedule_job() {
let storage = Arc::new(MemoryStorage::new());
let scheduler = JobScheduler::new(storage.clone());
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
let execute_at = Utc::now() + Duration::seconds(1);
scheduler
.schedule_job(job, execute_at, "test")
.await
.unwrap();
let stored_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(stored_job.state, JobState::Scheduled { .. }));
}
#[tokio::test]
async fn test_process_scheduled_jobs() {
let storage = Arc::new(MemoryStorage::new());
let scheduler = JobScheduler::new(storage.clone());
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
let execute_at = Utc::now() - Duration::seconds(1);
scheduler
.schedule_job(job, execute_at, "test")
.await
.unwrap();
scheduler
.process_due_jobs("scheduled", |storage, now, limit| async move {
storage.claim_due_scheduled_jobs(now, limit).await
})
.await
.unwrap();
let updated_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(updated_job.state, JobState::Enqueued { .. }));
}
#[tokio::test]
async fn fetch_due_scheduled_jobs_bounds_to_limit_and_past_due() {
let storage = Arc::new(MemoryStorage::new());
for _ in 0..1000 {
let mut job = Job::new("noop", serde_json::Value::Null);
job.set_state(JobState::scheduled(
Utc::now() + Duration::hours(1),
"future",
))
.unwrap();
storage.enqueue(&job).await.unwrap();
}
let mut due_ids = Vec::with_capacity(10);
for _ in 0..10 {
let mut job = Job::new("noop", serde_json::Value::Null);
job.set_state(JobState::scheduled(
Utc::now() - Duration::seconds(5),
"past",
))
.unwrap();
due_ids.push(job.id.clone());
storage.enqueue(&job).await.unwrap();
}
let due = storage
.fetch_due_scheduled_jobs(Utc::now(), 100)
.await
.unwrap();
assert_eq!(
due.len(),
10,
"storage should only return the 10 past-due jobs"
);
for job in &due {
assert!(due_ids.contains(&job.id));
}
let scheduler = JobScheduler::new(storage.clone());
scheduler
.process_due_jobs("scheduled", |storage, now, limit| async move {
storage.claim_due_scheduled_jobs(now, limit).await
})
.await
.unwrap();
for id in &due_ids {
let job = storage.get(id).await.unwrap().unwrap();
assert!(
matches!(job.state, JobState::Enqueued { .. }),
"job {} should have moved to Enqueued",
id
);
}
}
#[tokio::test]
async fn fetch_due_retry_jobs_filters_future_retries() {
let storage = Arc::new(MemoryStorage::new());
let mut future_retry = Job::new("noop", serde_json::Value::Null);
future_retry.state = JobState::awaiting_retry(Utc::now() + Duration::minutes(10), "later");
storage.enqueue(&future_retry).await.unwrap();
let mut due_retry = Job::new("noop", serde_json::Value::Null);
due_retry.state = JobState::awaiting_retry(Utc::now() - Duration::seconds(1), "now");
let due_id = due_retry.id.clone();
storage.enqueue(&due_retry).await.unwrap();
let due = storage.fetch_due_retry_jobs(Utc::now(), 100).await.unwrap();
assert_eq!(due.len(), 1);
assert_eq!(due[0].id, due_id);
}
#[tokio::test]
async fn claim_due_scheduled_jobs_returns_enqueued_state_atomically() {
let storage = Arc::new(MemoryStorage::new());
let mut due_ids = Vec::new();
for _ in 0..5 {
let mut job = Job::new("noop", serde_json::Value::Null);
job.set_state(JobState::scheduled(
Utc::now() - Duration::seconds(5),
"past",
))
.unwrap();
due_ids.push(job.id.clone());
storage.enqueue(&job).await.unwrap();
}
let claimed = storage
.claim_due_scheduled_jobs(Utc::now(), 100)
.await
.unwrap();
assert_eq!(claimed.len(), 5, "all 5 due jobs must be claimed");
for job in &claimed {
assert!(
matches!(job.state, JobState::Enqueued { .. }),
"claim returns jobs already in Enqueued state, got {:?}",
job.state
);
}
let again = storage
.claim_due_scheduled_jobs(Utc::now(), 100)
.await
.unwrap();
assert!(
again.is_empty(),
"second claim must not re-pick already-promoted jobs"
);
for id in &due_ids {
let job = storage.get(id).await.unwrap().unwrap();
assert!(matches!(job.state, JobState::Enqueued { .. }));
}
}
#[tokio::test]
async fn test_schedule_job_in() {
let storage = Arc::new(MemoryStorage::new());
let scheduler = JobScheduler::new(storage.clone());
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
scheduler
.schedule_job_in(job, Duration::minutes(5), "delayed")
.await
.unwrap();
let stored_job = storage.get(&job_id).await.unwrap().unwrap();
if let JobState::Scheduled { enqueue_at, .. } = stored_job.state {
assert!(enqueue_at > Utc::now());
} else {
panic!("Job should be in Scheduled state");
}
}
}