use super::{
BackgroundJobConfig, BackgroundJobError, BackgroundJobRecord, BackgroundJobResult,
BackgroundJobState, BackgroundJobTracker,
};
impl BackgroundJobTracker {
pub fn new(config: BackgroundJobConfig) -> BackgroundJobResult<Self> {
if config.max_queued_jobs == 0 {
return Err(BackgroundJobError::InvalidInput(
"max_queued_jobs must be > 0".to_string(),
));
}
if config.max_retained_terminal_jobs == 0 {
return Err(BackgroundJobError::InvalidInput(
"max_retained_terminal_jobs must be > 0".to_string(),
));
}
Ok(Self {
config,
next_job_id: 1,
is_shutdown: false,
queued: std::collections::VecDeque::new(),
jobs: std::collections::HashMap::new(),
terminal_order: std::collections::VecDeque::new(),
})
}
pub fn submit(&mut self, job_kind: &str) -> BackgroundJobResult<u64> {
if self.is_shutdown {
return Err(BackgroundJobError::Shutdown);
}
if self.queued.len() >= self.config.max_queued_jobs {
return Err(BackgroundJobError::Backpressure {
queued_jobs: self.queued.len(),
max_queued_jobs: self.config.max_queued_jobs,
});
}
if job_kind.trim().is_empty() {
return Err(BackgroundJobError::InvalidInput(
"job_kind must not be empty".to_string(),
));
}
let job_id = self.next_job_id;
self.next_job_id += 1;
self.jobs.insert(
job_id,
BackgroundJobRecord {
job_id,
job_kind: job_kind.to_string(),
attempts: 0,
state: BackgroundJobState::Queued,
},
);
self.queued.push_back(job_id);
Ok(job_id)
}
pub fn start_next(&mut self) -> Option<u64> {
let job_id = self.queued.pop_front()?;
let record = self.jobs.get_mut(&job_id)?;
record.attempts = record.attempts.saturating_add(1);
record.state = BackgroundJobState::Running;
Some(job_id)
}
pub fn mark_succeeded(&mut self, job_id: u64) -> BackgroundJobResult<()> {
let record = self
.jobs
.get_mut(&job_id)
.ok_or(BackgroundJobError::NotFound(job_id))?;
if !matches!(record.state, BackgroundJobState::Running) {
return Err(BackgroundJobError::InvalidTransition(format!(
"job {} must be running to mark succeeded",
job_id
)));
}
record.state = BackgroundJobState::Succeeded;
self.record_terminal(job_id);
Ok(())
}
pub fn retry_running(&mut self, job_id: u64) -> BackgroundJobResult<u32> {
let record = self
.jobs
.get_mut(&job_id)
.ok_or(BackgroundJobError::NotFound(job_id))?;
if !matches!(record.state, BackgroundJobState::Running) {
return Err(BackgroundJobError::InvalidTransition(format!(
"job {} must be running to retry",
job_id
)));
}
record.attempts = record.attempts.saturating_add(1);
Ok(record.attempts)
}
pub fn mark_failed(&mut self, job_id: u64, reason: &str) -> BackgroundJobResult<()> {
let record = self
.jobs
.get_mut(&job_id)
.ok_or(BackgroundJobError::NotFound(job_id))?;
if !matches!(record.state, BackgroundJobState::Running) {
return Err(BackgroundJobError::InvalidTransition(format!(
"job {} must be running to mark failed",
job_id
)));
}
record.state = BackgroundJobState::Failed {
reason: reason.to_string(),
};
self.record_terminal(job_id);
Ok(())
}
pub fn cancel(&mut self, job_id: u64) -> BackgroundJobResult<()> {
let record = self
.jobs
.get_mut(&job_id)
.ok_or(BackgroundJobError::NotFound(job_id))?;
if !matches!(
record.state,
BackgroundJobState::Queued | BackgroundJobState::Running
) {
return Err(BackgroundJobError::InvalidTransition(format!(
"job {} must be queued/running to cancel",
job_id
)));
}
record.state = BackgroundJobState::Canceled;
self.queued.retain(|queued_id| *queued_id != job_id);
self.record_terminal(job_id);
Ok(())
}
pub fn request_shutdown(&mut self) -> usize {
self.is_shutdown = true;
let pending = self.queued.drain(..).collect::<Vec<u64>>();
for job_id in &pending {
if let Some(record) = self.jobs.get_mut(job_id) {
if matches!(record.state, BackgroundJobState::Queued) {
record.state = BackgroundJobState::Canceled;
self.record_terminal(*job_id);
}
}
}
pending.len()
}
pub fn status(&self, job_id: u64) -> Option<&BackgroundJobRecord> {
self.jobs.get(&job_id)
}
pub fn snapshot(&self) -> Vec<BackgroundJobRecord> {
let mut records = self
.jobs
.values()
.cloned()
.collect::<Vec<BackgroundJobRecord>>();
records.sort_by_key(|record| record.job_id);
records
}
pub fn queued_jobs(&self) -> usize {
self.queued.len()
}
pub fn is_shutdown(&self) -> bool {
self.is_shutdown
}
fn record_terminal(&mut self, job_id: u64) {
self.terminal_order.push_back(job_id);
while self.terminal_order.len() > self.config.max_retained_terminal_jobs {
if let Some(evict_id) = self.terminal_order.pop_front() {
self.jobs.remove(&evict_id);
}
}
}
}