use serde::Serialize;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
pub const MAX_CONCURRENT_JOBS: u32 = 16;
#[derive(Debug, Clone, Serialize)]
pub struct BackgroundJob {
pub job_id: String,
pub capability: String,
pub status: String,
pub started_at: u64,
pub finished_at: Option<u64>,
pub result: Option<String>,
}
pub struct BackgroundJobRegistry {
jobs: std::sync::RwLock<HashMap<String, BackgroundJob>>,
running: AtomicU32,
}
impl BackgroundJobRegistry {
pub fn new() -> Self {
Self {
jobs: std::sync::RwLock::new(HashMap::new()),
running: AtomicU32::new(0),
}
}
pub fn try_reserve(&self) -> bool {
#[allow(clippy::arithmetic_side_effects)] self.running
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| {
if n < MAX_CONCURRENT_JOBS {
Some(n + 1)
} else {
None
}
})
.is_ok()
}
pub fn release(&self) {
self.running.fetch_sub(1, Ordering::SeqCst);
}
pub fn insert(&self, job: BackgroundJob) {
self.jobs
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(job.job_id.clone(), job);
}
pub fn get(&self, job_id: &str) -> Option<BackgroundJob> {
self.jobs
.read()
.unwrap_or_else(|e| e.into_inner())
.get(job_id)
.cloned()
}
pub fn update(&self, job_id: &str, status: &str, result: Option<String>, finished_at: u64) {
let mut jobs = self.jobs.write().unwrap_or_else(|e| e.into_inner());
if let Some(job) = jobs.get_mut(job_id) {
job.status = status.to_string();
job.finished_at = Some(finished_at);
job.result = result;
}
}
pub fn list(&self, limit: usize) -> Vec<BackgroundJob> {
let jobs = self.jobs.read().unwrap_or_else(|e| e.into_inner());
let mut v: Vec<_> = jobs.values().cloned().collect();
v.sort_by_key(|j| j.started_at);
v.reverse();
v.truncate(limit);
v
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_background_job_registry_new_is_empty() {
let reg = BackgroundJobRegistry::new();
assert!(reg.list(10).is_empty());
}
#[test]
fn test_background_job_registry_try_reserve() {
let reg = BackgroundJobRegistry::new();
for _ in 0..MAX_CONCURRENT_JOBS {
assert!(reg.try_reserve());
}
assert!(!reg.try_reserve());
}
#[test]
fn test_background_job_registry_release_allows_re_reserve() {
let reg = BackgroundJobRegistry::new();
for _ in 0..MAX_CONCURRENT_JOBS {
assert!(reg.try_reserve());
}
reg.release();
assert!(reg.try_reserve());
}
#[test]
fn test_background_job_registry_insert_and_get() {
let reg = BackgroundJobRegistry::new();
let job = BackgroundJob {
job_id: "test-123".into(),
capability: "FileRead".into(),
status: "running".into(),
started_at: 1000,
finished_at: None,
result: None,
};
reg.insert(job);
let retrieved = reg.get("test-123").unwrap();
assert_eq!(retrieved.job_id, "test-123");
assert_eq!(retrieved.capability, "FileRead");
}
#[test]
fn test_background_job_registry_update() {
let reg = BackgroundJobRegistry::new();
let job = BackgroundJob {
job_id: "test-123".into(),
capability: "FileRead".into(),
status: "running".into(),
started_at: 1000,
finished_at: None,
result: None,
};
reg.insert(job);
reg.update("test-123", "completed", Some("success".into()), 2000);
let retrieved = reg.get("test-123").unwrap();
assert_eq!(retrieved.status, "completed");
assert_eq!(retrieved.finished_at, Some(2000));
assert_eq!(retrieved.result, Some("success".into()));
}
#[test]
fn test_background_job_registry_list() {
let reg = BackgroundJobRegistry::new();
for i in 0..5 {
let job = BackgroundJob {
job_id: format!("job-{}", i),
capability: "FileRead".into(),
status: "completed".into(),
started_at: 1000 + i,
finished_at: Some(2000 + i),
result: None,
};
reg.insert(job);
}
let jobs = reg.list(3);
assert_eq!(jobs.len(), 3);
assert_eq!(jobs.first().unwrap().job_id, "job-4");
assert_eq!(jobs.get(1).unwrap().job_id, "job-3");
assert_eq!(jobs.get(2).unwrap().job_id, "job-2");
}
}