use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use anyhow::{Context as _, Result, anyhow};
use serde::{Deserialize, Serialize};
use crate::worker_api::WorkerPool;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum JobPriority {
Low,
#[default]
Normal,
High,
Critical,
}
impl JobPriority {
fn ordinal(self) -> u8 {
match self {
Self::Low => 0,
Self::Normal => 1,
Self::High => 2,
Self::Critical => 3,
}
}
}
impl PartialOrd for JobPriority {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for JobPriority {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.ordinal().cmp(&other.ordinal())
}
}
#[derive(Debug, Clone)]
pub struct JobProgress {
pub job_id: String,
pub percent: f64,
pub message: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CancellationToken {
flag: Arc<AtomicBool>,
}
impl CancellationToken {
pub fn new() -> Self {
Self {
flag: Arc::new(AtomicBool::new(false)),
}
}
pub fn is_cancelled(&self) -> bool {
self.flag.load(Ordering::Acquire)
}
pub fn cancel(&self) {
self.flag.store(true, Ordering::Release);
}
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_retries: u32,
pub delay_ms: u64,
pub backoff_multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_retries: 3,
delay_ms: 1000,
backoff_multiplier: 2.0,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum JobStatus {
Queued,
Running,
Completed,
Failed,
Cancelled,
Paused,
Retrying {
attempt: u32,
},
}
pub trait BackgroundJob: Send + Serialize + 'static {
type Output: Serialize + for<'de> Deserialize<'de> + Send + 'static;
fn id(&self) -> &str;
}
#[derive(Debug, Clone)]
pub struct JobDescriptor {
pub id: String,
pub priority: JobPriority,
pub retry_policy: Option<RetryPolicy>,
pub cancellation_token: CancellationToken,
pub dependencies: Vec<String>,
}
impl JobDescriptor {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
priority: JobPriority::default(),
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: Vec::new(),
}
}
pub fn with_priority(mut self, priority: JobPriority) -> Self {
self.priority = priority;
self
}
pub fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = Some(policy);
self
}
pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self {
self.cancellation_token = token;
self
}
pub fn with_dependencies(mut self, deps: Vec<String>) -> Self {
self.dependencies = deps;
self
}
}
#[derive(Debug, Clone)]
pub struct JobInfo {
pub id: String,
pub status: JobStatus,
pub priority: JobPriority,
pub progress: Option<JobProgress>,
pub retry_count: u32,
pub created_at: Instant,
pub started_at: Option<Instant>,
pub completed_at: Option<Instant>,
}
#[derive(Debug, Clone)]
struct JobEntry {
status: JobStatus,
priority: JobPriority,
progress: Option<JobProgress>,
retry_count: u32,
retry_policy: Option<RetryPolicy>,
cancellation_token: CancellationToken,
dependencies: Vec<String>,
created_at: Instant,
started_at: Option<Instant>,
completed_at: Option<Instant>,
}
impl JobEntry {
fn to_info(&self, id: &str) -> JobInfo {
JobInfo {
id: id.to_string(),
status: self.status.clone(),
priority: self.priority,
progress: self.progress.clone(),
retry_count: self.retry_count,
created_at: self.created_at,
started_at: self.started_at,
completed_at: self.completed_at,
}
}
}
pub struct JobScheduler {
worker_pool: Option<WorkerPool>,
entries: Arc<Mutex<HashMap<String, JobEntry>>>,
results: Arc<Mutex<HashMap<String, serde_json::Value>>>,
running_count: Arc<AtomicUsize>,
max_concurrent: usize,
}
impl JobScheduler {
pub fn new() -> Self {
Self {
worker_pool: None,
entries: Arc::new(Mutex::new(HashMap::new())),
results: Arc::new(Mutex::new(HashMap::new())),
running_count: Arc::new(AtomicUsize::new(0)),
max_concurrent: usize::MAX,
}
}
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent = max;
self
}
pub fn with_worker_pool(mut self, pool: WorkerPool) -> Self {
self.worker_pool = Some(pool);
self
}
pub fn schedule<Job>(&self, job: Job) -> Result<String>
where
Job: BackgroundJob,
{
let descriptor = JobDescriptor::new(job.id());
self.schedule_with_descriptor(job, descriptor)
}
pub fn schedule_with_descriptor<Job>(
&self,
job: Job,
descriptor: JobDescriptor,
) -> Result<String>
where
Job: BackgroundJob,
{
let id = job.id().to_string();
{
let mut entries = self.entries.lock().unwrap();
if entries.contains_key(&id) {
return Err(anyhow!("job already exists: {}", id));
}
let entry = JobEntry {
status: JobStatus::Queued,
priority: descriptor.priority,
progress: None,
retry_count: 0,
retry_policy: descriptor.retry_policy,
cancellation_token: descriptor.cancellation_token.clone(),
dependencies: descriptor.dependencies.clone(),
created_at: Instant::now(),
started_at: None,
completed_at: None,
};
entries.insert(id.clone(), entry);
}
if !self.dependencies_met(&id) {
return Ok(id);
}
if descriptor.cancellation_token.is_cancelled() {
let mut entries = self.entries.lock().unwrap();
if let Some(entry) = entries.get_mut(&id) {
entry.status = JobStatus::Cancelled;
entry.completed_at = Some(Instant::now());
}
return Ok(id);
}
self.try_execute(&id, job)?;
Ok(id)
}
fn try_execute<Job>(&self, id: &str, job: Job) -> Result<()>
where
Job: BackgroundJob,
{
let current = self.running_count.load(Ordering::Acquire);
if current >= self.max_concurrent {
return Ok(());
}
self.running_count.fetch_add(1, Ordering::AcqRel);
{
let mut entries = self.entries.lock().unwrap();
if let Some(entry) = entries.get_mut(id) {
entry.status = JobStatus::Running;
entry.started_at = Some(Instant::now());
}
}
let result: Result<Job::Output> = if let Some(ref pool) = self.worker_pool {
pool.request(job).context("worker request failed")
} else {
Err(anyhow!("no worker pool configured"))
};
self.running_count.fetch_sub(1, Ordering::AcqRel);
let mut entries = self.entries.lock().unwrap();
let mut results = self.results.lock().unwrap();
match result {
Ok(output) => {
let value = serde_json::to_value(output).context("failed to serialize result")?;
results.insert(id.to_string(), value);
if let Some(entry) = entries.get_mut(id) {
entry.status = JobStatus::Completed;
entry.completed_at = Some(Instant::now());
}
}
Err(_) => {
if let Some(entry) = entries.get_mut(id) {
let should_retry = entry
.retry_policy
.as_ref()
.map(|p| entry.retry_count < p.max_retries)
.unwrap_or(false);
if should_retry {
entry.retry_count += 1;
entry.status = JobStatus::Retrying {
attempt: entry.retry_count,
};
} else {
entry.status = JobStatus::Failed;
entry.completed_at = Some(Instant::now());
}
}
}
}
Ok(())
}
pub fn cancel(&self, job_id: &str) -> Result<()> {
let mut entries = self.entries.lock().unwrap();
match entries.get_mut(job_id) {
Some(entry) => match &entry.status {
JobStatus::Queued
| JobStatus::Running
| JobStatus::Paused
| JobStatus::Retrying { .. } => {
entry.cancellation_token.cancel();
entry.status = JobStatus::Cancelled;
entry.completed_at = Some(Instant::now());
Ok(())
}
status => Err(anyhow!("job cannot be cancelled: {:?}", status)),
},
None => Err(anyhow!("job not found: {}", job_id)),
}
}
pub fn pause(&self, job_id: &str) -> Result<()> {
let mut entries = self.entries.lock().unwrap();
match entries.get_mut(job_id) {
Some(entry) => match entry.status {
JobStatus::Queued | JobStatus::Running => {
entry.status = JobStatus::Paused;
Ok(())
}
_ => Err(anyhow!("job cannot be paused in state: {:?}", entry.status)),
},
None => Err(anyhow!("job not found: {}", job_id)),
}
}
pub fn resume(&self, job_id: &str) -> Result<()> {
let mut entries = self.entries.lock().unwrap();
match entries.get_mut(job_id) {
Some(entry) if entry.status == JobStatus::Paused => {
entry.status = JobStatus::Queued;
Ok(())
}
Some(entry) => Err(anyhow!(
"job cannot be resumed from state: {:?}",
entry.status
)),
None => Err(anyhow!("job not found: {}", job_id)),
}
}
pub fn report_progress(&self, progress: JobProgress) -> Result<()> {
let mut entries = self.entries.lock().unwrap();
match entries.get_mut(&progress.job_id) {
Some(entry) if entry.status == JobStatus::Running => {
if !(0.0..=100.0).contains(&progress.percent) {
return Err(anyhow!(
"progress percent must be in 0..=100, got {}",
progress.percent
));
}
entry.progress = Some(progress);
Ok(())
}
Some(_) => Err(anyhow!("can only report progress for running jobs")),
None => Err(anyhow!("job not found: {}", progress.job_id)),
}
}
pub fn status(&self, job_id: &str) -> Option<JobStatus> {
self.entries
.lock()
.unwrap()
.get(job_id)
.map(|e| e.status.clone())
}
pub fn result(&self, job_id: &str) -> Option<serde_json::Value> {
self.results.lock().unwrap().get(job_id).cloned()
}
pub fn all_statuses(&self) -> HashMap<String, JobStatus> {
self.entries
.lock()
.unwrap()
.iter()
.map(|(k, v)| (k.clone(), v.status.clone()))
.collect()
}
pub fn jobs(&self) -> Vec<JobInfo> {
let entries = self.entries.lock().unwrap();
let mut infos: Vec<JobInfo> = entries
.iter()
.map(|(id, entry)| entry.to_info(id))
.collect();
infos.sort_by_key(|b| std::cmp::Reverse(b.priority));
infos
}
pub fn job_info(&self, job_id: &str) -> Option<JobInfo> {
self.entries
.lock()
.unwrap()
.get(job_id)
.map(|e| e.to_info(job_id))
}
pub fn running_count(&self) -> usize {
self.running_count.load(Ordering::Acquire)
}
pub fn max_concurrent(&self) -> usize {
self.max_concurrent
}
fn dependencies_met(&self, job_id: &str) -> bool {
let entries = self.entries.lock().unwrap();
let deps = match entries.get(job_id) {
Some(entry) => entry.dependencies.clone(),
None => return false,
};
deps.iter().all(|dep_id| {
entries
.get(dep_id)
.map(|e| e.status == JobStatus::Completed)
.unwrap_or(false)
})
}
}
impl Default for JobScheduler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Serialize)]
struct TestJob {
id: String,
}
impl BackgroundJob for TestJob {
type Output = String;
fn id(&self) -> &str {
&self.id
}
}
#[test]
fn test_job_scheduler_without_worker_pool() {
let scheduler = JobScheduler::new();
let job = TestJob {
id: "job1".to_string(),
};
let id = scheduler.schedule(job).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Failed));
}
#[test]
fn test_job_cancel() {
let scheduler = JobScheduler::new();
let id = "job1".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
assert!(scheduler.cancel(&id).is_ok());
assert_eq!(scheduler.status(&id), Some(JobStatus::Cancelled));
}
#[test]
fn test_job_cancel_not_found() {
let scheduler = JobScheduler::new();
assert!(scheduler.cancel("missing").is_err());
}
#[test]
fn test_job_cancel_completed() {
let scheduler = JobScheduler::new();
let id = "job1".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Completed,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: Some(Instant::now()),
},
);
}
assert!(scheduler.cancel(&id).is_err());
}
#[test]
fn test_priority_ordering() {
assert!(JobPriority::Critical > JobPriority::High);
assert!(JobPriority::High > JobPriority::Normal);
assert!(JobPriority::Normal > JobPriority::Low);
assert!(JobPriority::Low < JobPriority::Critical);
}
#[test]
fn test_priority_default_is_normal() {
assert_eq!(JobPriority::default(), JobPriority::Normal);
}
#[test]
fn test_cancellation_token() {
let token = CancellationToken::new();
assert!(!token.is_cancelled());
let clone = token.clone();
token.cancel();
assert!(token.is_cancelled());
assert!(clone.is_cancelled());
}
#[test]
fn test_cancellation_token_default() {
let token = CancellationToken::default();
assert!(!token.is_cancelled());
}
#[test]
fn test_retry_policy_default() {
let policy = RetryPolicy::default();
assert_eq!(policy.max_retries, 3);
assert_eq!(policy.delay_ms, 1000);
assert!((policy.backoff_multiplier - 2.0).abs() < f64::EPSILON);
}
#[test]
fn test_job_descriptor_builder() {
let token = CancellationToken::new();
let descriptor = JobDescriptor::new("test-job")
.with_priority(JobPriority::High)
.with_retry_policy(RetryPolicy {
max_retries: 5,
delay_ms: 500,
backoff_multiplier: 1.5,
})
.with_cancellation_token(token.clone())
.with_dependencies(vec!["dep1".to_string(), "dep2".to_string()]);
assert_eq!(descriptor.id, "test-job");
assert_eq!(descriptor.priority, JobPriority::High);
assert!(descriptor.retry_policy.is_some());
assert_eq!(descriptor.retry_policy.unwrap().max_retries, 5);
assert_eq!(descriptor.dependencies.len(), 2);
}
#[test]
fn test_job_status_paused_variant() {
let status = JobStatus::Paused;
assert_eq!(status, JobStatus::Paused);
assert_ne!(status, JobStatus::Queued);
}
#[test]
fn test_job_status_retrying_variant() {
let status = JobStatus::Retrying { attempt: 2 };
assert_eq!(status, JobStatus::Retrying { attempt: 2 });
assert_ne!(status, JobStatus::Retrying { attempt: 1 });
assert_ne!(status, JobStatus::Running);
}
#[test]
fn test_pause_and_resume() {
let scheduler = JobScheduler::new();
let id = "pausable".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
scheduler.pause(&id).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Paused));
scheduler.resume(&id).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Queued));
}
#[test]
fn test_pause_invalid_state() {
let scheduler = JobScheduler::new();
let id = "completed-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Completed,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: Some(Instant::now()),
},
);
}
assert!(scheduler.pause(&id).is_err());
}
#[test]
fn test_resume_invalid_state() {
let scheduler = JobScheduler::new();
let id = "queued-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
assert!(scheduler.resume(&id).is_err());
}
#[test]
fn test_pause_not_found() {
let scheduler = JobScheduler::new();
assert!(scheduler.pause("ghost").is_err());
}
#[test]
fn test_resume_not_found() {
let scheduler = JobScheduler::new();
assert!(scheduler.resume("ghost").is_err());
}
#[test]
fn test_report_progress() {
let scheduler = JobScheduler::new();
let id = "running-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Running,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: Some(Instant::now()),
completed_at: None,
},
);
}
let progress = JobProgress {
job_id: id.clone(),
percent: 50.0,
message: Some("halfway there".to_string()),
};
scheduler.report_progress(progress).unwrap();
let info = scheduler.job_info(&id).unwrap();
let p = info.progress.unwrap();
assert!((p.percent - 50.0).abs() < f64::EPSILON);
assert_eq!(p.message, Some("halfway there".to_string()));
}
#[test]
fn test_report_progress_invalid_percent() {
let scheduler = JobScheduler::new();
let id = "running-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Running,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: Some(Instant::now()),
completed_at: None,
},
);
}
let progress = JobProgress {
job_id: id.clone(),
percent: 150.0,
message: None,
};
assert!(scheduler.report_progress(progress).is_err());
}
#[test]
fn test_report_progress_wrong_state() {
let scheduler = JobScheduler::new();
let id = "queued-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
let progress = JobProgress {
job_id: id.clone(),
percent: 10.0,
message: None,
};
assert!(scheduler.report_progress(progress).is_err());
}
#[test]
fn test_report_progress_not_found() {
let scheduler = JobScheduler::new();
let progress = JobProgress {
job_id: "ghost".to_string(),
percent: 10.0,
message: None,
};
assert!(scheduler.report_progress(progress).is_err());
}
#[test]
fn test_jobs_sorted_by_priority() {
let scheduler = JobScheduler::new();
let now = Instant::now();
{
let mut entries = scheduler.entries.lock().unwrap();
for (id, priority) in [
("low-job", JobPriority::Low),
("high-job", JobPriority::High),
("normal-job", JobPriority::Normal),
("critical-job", JobPriority::Critical),
] {
entries.insert(
id.to_string(),
JobEntry {
status: JobStatus::Queued,
priority,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: now,
started_at: None,
completed_at: None,
},
);
}
}
let jobs = scheduler.jobs();
assert_eq!(jobs.len(), 4);
assert_eq!(jobs[0].priority, JobPriority::Critical);
assert_eq!(jobs[1].priority, JobPriority::High);
assert_eq!(jobs[2].priority, JobPriority::Normal);
assert_eq!(jobs[3].priority, JobPriority::Low);
}
#[test]
fn test_job_info_fields() {
let scheduler = JobScheduler::new();
let now = Instant::now();
let id = "info-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Running,
priority: JobPriority::High,
progress: Some(JobProgress {
job_id: id.clone(),
percent: 75.0,
message: Some("almost done".to_string()),
}),
retry_count: 1,
retry_policy: Some(RetryPolicy::default()),
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: now,
started_at: Some(now),
completed_at: None,
},
);
}
let info = scheduler.job_info(&id).unwrap();
assert_eq!(info.id, id);
assert_eq!(info.status, JobStatus::Running);
assert_eq!(info.priority, JobPriority::High);
assert_eq!(info.retry_count, 1);
assert!(info.started_at.is_some());
assert!(info.completed_at.is_none());
assert!(info.progress.is_some());
}
#[test]
fn test_job_info_not_found() {
let scheduler = JobScheduler::new();
assert!(scheduler.job_info("nope").is_none());
}
#[test]
fn test_bounded_concurrency_tracking() {
let scheduler = JobScheduler::new().with_max_concurrent(2);
assert_eq!(scheduler.max_concurrent(), 2);
assert_eq!(scheduler.running_count(), 0);
}
#[test]
fn test_bounded_concurrency_blocks_excess() {
let scheduler = JobScheduler::new().with_max_concurrent(0);
let job = TestJob {
id: "blocked".to_string(),
};
let id = scheduler.schedule(job).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Queued));
}
#[test]
fn test_schedule_with_descriptor() {
let scheduler = JobScheduler::new();
let job = TestJob {
id: "desc-job".to_string(),
};
let descriptor = JobDescriptor::new("desc-job").with_priority(JobPriority::Critical);
let id = scheduler.schedule_with_descriptor(job, descriptor).unwrap();
let info = scheduler.job_info(&id).unwrap();
assert_eq!(info.priority, JobPriority::Critical);
}
#[test]
fn test_schedule_duplicate_rejected() {
let scheduler = JobScheduler::new();
let job1 = TestJob {
id: "dup".to_string(),
};
let job2 = TestJob {
id: "dup".to_string(),
};
scheduler.schedule(job1).unwrap();
assert!(scheduler.schedule(job2).is_err());
}
#[test]
fn test_dependencies_block_execution() {
let scheduler = JobScheduler::new();
let job = TestJob {
id: "child".to_string(),
};
let descriptor = JobDescriptor::new("child").with_dependencies(vec!["parent".to_string()]);
let id = scheduler.schedule_with_descriptor(job, descriptor).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Queued));
}
#[test]
fn test_dependencies_met_when_parent_completed() {
let scheduler = JobScheduler::new();
let now = Instant::now();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
"parent".to_string(),
JobEntry {
status: JobStatus::Completed,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: now,
started_at: Some(now),
completed_at: Some(now),
},
);
}
let job = TestJob {
id: "child".to_string(),
};
let descriptor = JobDescriptor::new("child").with_dependencies(vec!["parent".to_string()]);
let id = scheduler.schedule_with_descriptor(job, descriptor).unwrap();
assert_ne!(scheduler.status(&id), Some(JobStatus::Queued));
}
#[test]
fn test_cancel_triggers_token() {
let scheduler = JobScheduler::new();
let token = CancellationToken::new();
let id = "token-job".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Running,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: token.clone(),
dependencies: vec![],
created_at: Instant::now(),
started_at: Some(Instant::now()),
completed_at: None,
},
);
}
assert!(!token.is_cancelled());
scheduler.cancel(&id).unwrap();
assert!(token.is_cancelled());
assert_eq!(scheduler.status(&id), Some(JobStatus::Cancelled));
}
#[test]
fn test_cancel_paused_job() {
let scheduler = JobScheduler::new();
let id = "paused-cancel".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Paused,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
assert!(scheduler.cancel(&id).is_ok());
assert_eq!(scheduler.status(&id), Some(JobStatus::Cancelled));
}
#[test]
fn test_cancel_retrying_job() {
let scheduler = JobScheduler::new();
let id = "retrying-cancel".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Retrying { attempt: 2 },
priority: JobPriority::Normal,
progress: None,
retry_count: 2,
retry_policy: Some(RetryPolicy::default()),
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: Some(Instant::now()),
completed_at: None,
},
);
}
assert!(scheduler.cancel(&id).is_ok());
assert_eq!(scheduler.status(&id), Some(JobStatus::Cancelled));
}
#[test]
fn test_schedule_with_pre_cancelled_token() {
let scheduler = JobScheduler::new();
let token = CancellationToken::new();
token.cancel();
let job = TestJob {
id: "pre-cancelled".to_string(),
};
let descriptor = JobDescriptor::new("pre-cancelled").with_cancellation_token(token);
let id = scheduler.schedule_with_descriptor(job, descriptor).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Cancelled));
}
#[test]
fn test_retry_policy_on_failure() {
let scheduler = JobScheduler::new();
let job = TestJob {
id: "retry-me".to_string(),
};
let descriptor = JobDescriptor::new("retry-me").with_retry_policy(RetryPolicy {
max_retries: 3,
delay_ms: 100,
backoff_multiplier: 1.0,
});
let id = scheduler.schedule_with_descriptor(job, descriptor).unwrap();
assert_eq!(
scheduler.status(&id),
Some(JobStatus::Retrying { attempt: 1 })
);
let info = scheduler.job_info(&id).unwrap();
assert_eq!(info.retry_count, 1);
}
#[test]
fn test_all_statuses() {
let scheduler = JobScheduler::new();
let now = Instant::now();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
"a".to_string(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Low,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: now,
started_at: None,
completed_at: None,
},
);
entries.insert(
"b".to_string(),
JobEntry {
status: JobStatus::Completed,
priority: JobPriority::High,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: now,
started_at: Some(now),
completed_at: Some(now),
},
);
}
let statuses = scheduler.all_statuses();
assert_eq!(statuses.len(), 2);
assert_eq!(statuses["a"], JobStatus::Queued);
assert_eq!(statuses["b"], JobStatus::Completed);
}
#[test]
fn test_pause_running_job() {
let scheduler = JobScheduler::new();
let id = "running-pause".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Running,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: Some(Instant::now()),
completed_at: None,
},
);
}
scheduler.pause(&id).unwrap();
assert_eq!(scheduler.status(&id), Some(JobStatus::Paused));
}
#[test]
fn test_completed_at_set_on_cancel() {
let scheduler = JobScheduler::new();
let id = "cancel-ts".to_string();
{
let mut entries = scheduler.entries.lock().unwrap();
entries.insert(
id.clone(),
JobEntry {
status: JobStatus::Queued,
priority: JobPriority::Normal,
progress: None,
retry_count: 0,
retry_policy: None,
cancellation_token: CancellationToken::new(),
dependencies: vec![],
created_at: Instant::now(),
started_at: None,
completed_at: None,
},
);
}
scheduler.cancel(&id).unwrap();
let info = scheduler.job_info(&id).unwrap();
assert!(info.completed_at.is_some());
}
}