use noesis_ship::job_queue::{Job, JobQueue};
use serde::{Deserialize, Serialize};
pub use noesis_ship::job_queue::JobStatus;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrainingPayload {
pub experiment_id: String,
pub being: String,
pub corpus: String,
}
pub type TrainingJob = Job<TrainingPayload>;
#[derive(Debug, Serialize, Deserialize)]
pub struct TrainingQueue {
inner: JobQueue<TrainingPayload>,
}
impl TrainingQueue {
pub fn new() -> Self {
Self {
inner: JobQueue::new("TRAIN"),
}
}
pub fn queue_job(
&mut self,
experiment_id: &str,
being: &str,
corpus: &str,
machine: &str,
queued_by: &str,
) -> String {
self.inner.submit(
TrainingPayload {
experiment_id: experiment_id.to_string(),
being: being.to_string(),
corpus: corpus.to_string(),
},
machine,
queued_by,
)
}
pub fn claim_job(&mut self, machine: &str) -> Option<&TrainingJob> {
self.inner.claim(machine)
}
pub fn complete_job(&mut self, job_id: &str, results_path: &str) -> bool {
self.inner
.complete(job_id, serde_json::json!({ "results_path": results_path }))
}
pub fn fail_job(&mut self, job_id: &str, error: &str) -> bool {
self.inner.fail(job_id, error)
}
pub fn list_jobs(&self, status: Option<&JobStatus>) -> Vec<&TrainingJob> {
self.inner.list(status)
}
pub fn get_job(&self, job_id: &str) -> Option<&TrainingJob> {
self.inner.get(job_id)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn format_table_filtered(&self, status: Option<&JobStatus>) -> String {
let jobs = self.list_jobs(status);
self.format_jobs(&jobs)
}
pub fn format_table(&self) -> String {
let jobs = self.list_jobs(None);
self.format_jobs(&jobs)
}
fn format_jobs(&self, jobs: &[&TrainingJob]) -> String {
if jobs.is_empty() {
return "No training jobs.\n".to_string();
}
let mut lines = Vec::new();
lines.push(format!(
" {:<12} {:<12} {:<25} {:<10} {:<8} {}",
"Job ID", "Experiment", "Being", "Machine", "Status", "Queued By"
));
lines.push(format!(" {}", "-".repeat(85)));
for job in jobs {
lines.push(format!(
" {:<12} {:<12} {:<25} {:<10} {:<8} {}",
job.id,
job.payload.experiment_id,
truncate(&job.payload.being, 25),
job.worker,
job.status.as_str(),
job.queued_by,
));
}
let (queued, running, complete, failed) = self.inner.counts();
lines.push(format!(
"\n Total: {} jobs ({} queued, {} running, {} complete, {} failed)",
jobs.len(),
queued,
running,
complete,
failed,
));
lines.join("\n") + "\n"
}
}
impl Default for TrainingQueue {
fn default() -> Self {
Self::new()
}
}
fn truncate(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}...", &s[..max.saturating_sub(3)])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_queue_job() {
let mut q = TrainingQueue::new();
let id = q.queue_job("EXPR-3275", "santiago-bahai", "bahai", "DGX", "Captain");
assert_eq!(id, "TRAIN-001");
assert_eq!(q.len(), 1);
let job = q.get_job("TRAIN-001").expect("job");
assert_eq!(job.status, JobStatus::Queued);
assert_eq!(job.payload.experiment_id, "EXPR-3275");
assert_eq!(job.payload.being, "santiago-bahai");
}
#[test]
fn test_claim_job() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "being-1", "corpus-1", "DGX", "Captain");
let job = q.claim_job("DGX").expect("claim");
assert_eq!(job.status, JobStatus::Running);
assert!(job.started_at.is_some());
assert!(q.claim_job("DGX").is_none());
}
#[test]
fn test_claim_filters_by_machine() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "being-1", "corpus-1", "DGX", "Captain");
q.queue_job("EXPR-2", "being-2", "corpus-2", "Mini", "Captain");
let job = q.claim_job("Mini").expect("claim");
assert_eq!(job.payload.experiment_id, "EXPR-2");
let job = q.claim_job("DGX").expect("claim");
assert_eq!(job.payload.experiment_id, "EXPR-1");
}
#[test]
fn test_complete_job() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "being", "corpus", "DGX", "Captain");
q.claim_job("DGX");
assert!(q.complete_job("TRAIN-001", "research/shared/eval-data/expr1/"));
let job = q.get_job("TRAIN-001").unwrap();
assert_eq!(job.status, JobStatus::Complete);
assert!(job.completed_at.is_some());
}
#[test]
fn test_fail_job() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "being", "corpus", "DGX", "Captain");
q.claim_job("DGX");
assert!(q.fail_job("TRAIN-001", "OOM at epoch 3"));
let job = q.get_job("TRAIN-001").unwrap();
assert_eq!(job.status, JobStatus::Failed);
assert_eq!(job.error.as_deref(), Some("OOM at epoch 3"));
}
#[test]
fn test_cannot_complete_queued_job() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "being", "corpus", "DGX", "Captain");
assert!(!q.complete_job("TRAIN-001", "path"));
}
#[test]
fn test_list_jobs_filter() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-1", "b1", "c1", "DGX", "Captain");
q.queue_job("EXPR-2", "b2", "c2", "DGX", "Captain");
q.claim_job("DGX");
assert_eq!(q.list_jobs(Some(&JobStatus::Queued)).len(), 1);
assert_eq!(q.list_jobs(Some(&JobStatus::Running)).len(), 1);
assert_eq!(q.list_jobs(None).len(), 2);
}
#[test]
fn test_job_ids_increment() {
let mut q = TrainingQueue::new();
assert_eq!(q.queue_job("A", "b", "c", "D", "E"), "TRAIN-001");
assert_eq!(q.queue_job("A", "b", "c", "D", "E"), "TRAIN-002");
assert_eq!(q.queue_job("A", "b", "c", "D", "E"), "TRAIN-003");
}
#[test]
fn test_format_table() {
let mut q = TrainingQueue::new();
q.queue_job("EXPR-3275", "santiago-bahai", "bahai", "DGX", "Captain");
let output = q.format_table();
assert!(output.contains("TRAIN-001"));
assert!(output.contains("EXPR-3275"));
assert!(output.contains("queued"));
assert!(output.contains("Total: 1 jobs"));
}
#[test]
fn test_empty_format() {
let q = TrainingQueue::new();
assert!(q.format_table().contains("No training jobs"));
}
#[test]
fn test_status_roundtrip() {
for status in &[
JobStatus::Queued,
JobStatus::Running,
JobStatus::Complete,
JobStatus::Failed,
] {
let s = status.as_str();
assert_eq!(&JobStatus::parse(s).expect("parse"), status);
}
}
}