use gflow::core::executor::Executor;
use gflow::core::job::{Job, JobBuilder, JobState};
use gflow::core::scheduler::SchedulerBuilder;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct DelayedMockExecutor {
executions: Arc<Mutex<Vec<u32>>>,
execution_delay: Duration,
}
impl DelayedMockExecutor {
fn new(delay: Duration) -> Self {
Self {
executions: Arc::new(Mutex::new(Vec::new())),
execution_delay: delay,
}
}
fn get_executed_job_ids(&self) -> Vec<u32> {
self.executions.lock().unwrap().clone()
}
}
impl Executor for DelayedMockExecutor {
fn execute(&self, job: &Job) -> anyhow::Result<()> {
thread::sleep(self.execution_delay);
self.executions.lock().unwrap().push(job.id);
Ok(())
}
}
#[test]
fn test_job_cancelled_between_prepare_and_execute() {
let executor = DelayedMockExecutor::new(Duration::from_millis(50));
let executor_clone = executor.clone();
let mut scheduler = SchedulerBuilder::new()
.with_executor(Box::new(executor))
.with_state_path(PathBuf::from("/tmp/test_cancel_race.json"))
.with_total_memory_mb(16 * 1024)
.build();
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.command("echo test")
.build();
let (job_id, _) = scheduler.submit_job(job);
let jobs_to_execute = scheduler.prepare_jobs_for_execution();
assert_eq!(jobs_to_execute.len(), 1);
assert_eq!(jobs_to_execute[0].id, job_id);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Running);
scheduler.cancel_job(job_id, None);
assert_eq!(
scheduler.get_job(job_id).unwrap().state,
JobState::Cancelled
);
let results = scheduler.execute_jobs_no_lock(&jobs_to_execute);
scheduler.handle_execution_failures(&results);
let executed_jobs = executor_clone.get_executed_job_ids();
println!("Executed jobs: {:?}", executed_jobs);
println!(
"Job final state: {:?}",
scheduler.get_job(job_id).unwrap().state
);
assert_eq!(
scheduler.get_job(job_id).unwrap().state,
JobState::Cancelled
);
}
#[test]
fn test_job_failed_between_prepare_and_execute() {
let executor = DelayedMockExecutor::new(Duration::from_millis(50));
let executor_clone = executor.clone();
let mut scheduler = SchedulerBuilder::new()
.with_executor(Box::new(executor))
.with_state_path(PathBuf::from("/tmp/test_fail_race.json"))
.with_total_memory_mb(16 * 1024)
.build();
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.command("echo test")
.build();
let (job_id, _) = scheduler.submit_job(job);
let jobs_to_execute = scheduler.prepare_jobs_for_execution();
assert_eq!(jobs_to_execute.len(), 1);
scheduler.fail_job(job_id);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Failed);
let results = scheduler.execute_jobs_no_lock(&jobs_to_execute);
scheduler.handle_execution_failures(&results);
let executed_jobs = executor_clone.get_executed_job_ids();
println!("Executed jobs: {:?}", executed_jobs);
assert_eq!(scheduler.get_job(job_id).unwrap().state, JobState::Failed);
}
#[test]
fn test_multiple_jobs_partial_cancellation() {
let executor = DelayedMockExecutor::new(Duration::from_millis(10));
let executor_clone = executor.clone();
let mut scheduler = SchedulerBuilder::new()
.with_executor(Box::new(executor))
.with_state_path(PathBuf::from("/tmp/test_partial_cancel.json"))
.with_total_memory_mb(16 * 1024)
.build();
let mut job_ids = Vec::new();
for i in 0..5 {
let job = JobBuilder::new()
.submitted_by("test")
.run_dir("/tmp")
.command(format!("echo job-{}", i))
.build();
let (job_id, _) = scheduler.submit_job(job);
job_ids.push(job_id);
}
let jobs_to_execute = scheduler.prepare_jobs_for_execution();
assert_eq!(jobs_to_execute.len(), 5);
scheduler.cancel_job(job_ids[1], None);
scheduler.cancel_job(job_ids[3], None);
let results = scheduler.execute_jobs_no_lock(&jobs_to_execute);
scheduler.handle_execution_failures(&results);
let executed_jobs = executor_clone.get_executed_job_ids();
println!("Executed jobs: {:?}", executed_jobs);
println!("Cancelled jobs: {:?}", vec![job_ids[1], job_ids[3]]);
assert_eq!(
scheduler.get_job(job_ids[1]).unwrap().state,
JobState::Cancelled
);
assert_eq!(
scheduler.get_job(job_ids[3]).unwrap().state,
JobState::Cancelled
);
}