use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
const TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("jobs");
pub const KEEP_DAYS: i64 = 30;
pub const CAP: usize = 2000;
pub mod status {
pub const QUEUED: &str = "queued";
pub const RUNNING: &str = "running";
pub const DONE: &str = "done";
pub const FAILED: &str = "failed";
pub fn is_terminal(s: &str) -> bool {
s == DONE || s == FAILED
}
}
pub mod kind {
pub const RELEASE_RUN: &str = "release_run";
pub const RELEASE_UNDO: &str = "release_undo";
pub const WORKSPACE_FETCH: &str = "workspace_fetch";
pub const WORKSPACE_REPUBLISH: &str = "workspace_republish";
pub const WORKSPACE_POPULATE: &str = "workspace_populate";
pub const WORKSPACE_CLONE: &str = "workspace_clone";
pub const DOCS_RENDER: &str = "docs_render";
pub const DOCS_EXPORT: &str = "docs_export";
pub const DOCS_BOOK: &str = "docs_book";
pub const DOCS_BOOK_SVG: &str = "docs_book_svg";
pub const KNOWLEDGE_SCAN: &str = "knowledge_scan";
pub const SCIP_INGEST: &str = "scip_ingest";
pub const DEEPSCAN: &str = "deepscan";
pub const SNAPSHOT: &str = "snapshot";
pub const SYMBOL_SCAN: &str = "symbol_scan";
pub const SECURITY_SCAN: &str = "security_scan";
pub const BENCH_RUN: &str = "bench_run";
pub const TEST_MATRIX: &str = "test_matrix";
pub const ARCH_GENERATE: &str = "arch_generate";
pub const INDEX_BUILD: &str = "index_build";
pub const VECTOR_EMBED: &str = "vector_embed";
pub const BAKEOFF: &str = "bakeoff";
pub const AIRGAP: &str = "airgap";
pub const COVERAGE: &str = "coverage";
pub const CONTAINER: &str = "container";
pub const WAREHOUSE_COMPACT: &str = "warehouse_compact";
pub const WAREHOUSE_EXPIRE: &str = "warehouse_expire";
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobClass {
Exclusive,
Parallel,
}
pub fn class_of(kind: &str) -> JobClass {
match kind {
kind::BENCH_RUN => JobClass::Exclusive,
_ => JobClass::Parallel,
}
}
pub fn is_exclusive(kind: &str) -> bool {
matches!(class_of(kind), JobClass::Exclusive)
}
fn exclusive_gate() -> &'static Arc<tokio::sync::Semaphore> {
static GATE: std::sync::OnceLock<Arc<tokio::sync::Semaphore>> = std::sync::OnceLock::new();
GATE.get_or_init(|| Arc::new(tokio::sync::Semaphore::new(1)))
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct JobRecord {
pub job_id: String,
pub kind: String,
pub target: String,
pub workspace: String,
pub status: String,
pub ts_start_micros: i64,
pub ts_end_micros: Option<i64>,
pub elapsed_ms: Option<i64>,
pub detail_json: String,
pub result_ref: String,
#[serde(default)]
pub parent_id: Option<String>,
}
impl JobRecord {
pub fn is_terminal(&self) -> bool {
status::is_terminal(&self.status)
}
}
#[derive(Debug, Clone)]
pub enum JobSelector {
All,
Workspace(String),
Kind(String),
}
impl JobSelector {
fn matches(&self, r: &JobRecord) -> bool {
match self {
JobSelector::All => true,
JobSelector::Workspace(w) => &r.workspace == w,
JobSelector::Kind(k) => &r.kind == k,
}
}
}
fn now_micros() -> i64 {
chrono::Utc::now().timestamp_micros()
}
#[derive(Clone)]
pub struct JobSink(Arc<dyn Fn(&JobRecord) + Send + Sync>);
impl JobSink {
pub fn redb(db: Arc<Database>) -> Self {
JobSink(Arc::new(move |rec: &JobRecord| {
if let Err(e) = redb_upsert(&db, rec) {
eprintln!(" âš jobs: dropped {} row for {}/{} (non-fatal): {e:#}", rec.status, rec.kind, rec.target);
return;
}
if rec.is_terminal() {
if let Err(e) = redb_prune(&db, KEEP_DAYS, CAP) {
eprintln!(" âš jobs: prune failed (non-fatal): {e:#}");
}
}
}))
}
pub fn remote(f: impl Fn(&JobRecord) + Send + Sync + 'static) -> Self {
JobSink(Arc::new(f))
}
pub fn noop() -> Self {
JobSink(Arc::new(|_| {}))
}
fn upsert(&self, rec: &JobRecord) {
(self.0)(rec)
}
}
pub struct JobHandle {
sink: JobSink,
rec: JobRecord,
finished: bool,
_gate_permit: Option<tokio::sync::OwnedSemaphorePermit>,
}
impl JobHandle {
pub fn start(
sink: JobSink,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> Self {
let rec = Self::new_record(kind, target, workspace, detail, status::RUNNING, None);
sink.upsert(&rec);
JobHandle { sink, rec, finished: false, _gate_permit: None }
}
pub fn start_child(
sink: JobSink,
parent_id: &str,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> Self {
let rec = Self::new_record(
kind,
target,
workspace,
detail,
status::RUNNING,
Some(parent_id.to_string()),
);
sink.upsert(&rec);
JobHandle { sink, rec, finished: false, _gate_permit: None }
}
pub async fn start_scheduled(
sink: JobSink,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> Self {
if !is_exclusive(kind) {
return Self::start(sink, kind, target, workspace, detail);
}
let mut rec = Self::new_record(kind, target, workspace, detail, status::QUEUED, None);
sink.upsert(&rec);
let permit = exclusive_gate()
.clone()
.acquire_owned()
.await
.expect("exclusive job gate is never closed");
rec.status = status::RUNNING.to_string();
rec.ts_start_micros = now_micros();
sink.upsert(&rec);
JobHandle { sink, rec, finished: false, _gate_permit: Some(permit) }
}
fn new_record(
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
initial_status: &str,
parent_id: Option<String>,
) -> JobRecord {
JobRecord {
job_id: uuid::Uuid::new_v4().to_string(),
kind: kind.to_string(),
target: target.to_string(),
workspace: workspace.to_string(),
status: initial_status.to_string(),
ts_start_micros: now_micros(),
ts_end_micros: None,
elapsed_ms: None,
detail_json: detail_to_string(detail),
result_ref: String::new(),
parent_id,
}
}
pub fn job_id(&self) -> &str {
&self.rec.job_id
}
pub fn finish(mut self, detail: serde_json::Value, result_ref: &str) {
self.terminate(status::DONE, detail_to_string(detail), result_ref.to_string());
}
pub fn fail(mut self, err: &anyhow::Error) {
let detail = serde_json::json!({ "error": format!("{err:#}") }).to_string();
self.terminate(status::FAILED, detail, String::new());
}
pub fn fail_with_detail(mut self, detail: serde_json::Value) {
self.terminate(status::FAILED, detail_to_string(detail), String::new());
}
fn terminate(&mut self, st: &str, detail_json: String, result_ref: String) {
let end = now_micros();
self.rec.status = st.to_string();
self.rec.ts_end_micros = Some(end);
self.rec.elapsed_ms = Some((end - self.rec.ts_start_micros) / 1000);
self.rec.detail_json = detail_json;
self.rec.result_ref = result_ref;
self.sink.upsert(&self.rec);
self.finished = true;
}
}
impl Drop for JobHandle {
fn drop(&mut self) {
if !self.finished {
let detail =
serde_json::json!({ "error": "job dropped without finish/fail (panic or early return)" })
.to_string();
self.terminate(status::FAILED, detail, String::new());
}
}
}
fn detail_to_string(detail: serde_json::Value) -> String {
if detail.is_null() {
String::new()
} else {
detail.to_string()
}
}
pub struct JobStore {
db: Arc<Database>,
_snapshot: Option<tempfile::TempDir>,
}
impl JobStore {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create jobs root {}", root.display()))?;
let path = root.join("jobs.redb");
let db = Database::create(&path).with_context(|| format!("open {}", path.display()))?;
let w = db.begin_write()?;
{
let _ = w.open_table(TABLE)?;
}
w.commit()?;
Ok(Self { db: Arc::new(db), _snapshot: None })
}
pub fn open_read_only(root: &Path) -> Result<Self> {
match Self::open(root) {
Ok(s) => Ok(s),
Err(e) if is_lock_error(&e) => {
let live = root.join("jobs.redb");
if !live.exists() {
return Err(e);
}
let tmp = tempfile::Builder::new()
.prefix("nornir-jobs-snapshot-")
.tempdir()
.context("create temp dir for jobs snapshot")?;
let snap = tmp.path().join("jobs.redb");
copy_redb_consistent(&live, &snap).with_context(|| {
format!("copy locked jobs {} -> snapshot {}", live.display(), snap.display())
})?;
let db = Database::create(&snap)
.with_context(|| format!("open jobs snapshot {}", snap.display()))?;
Ok(Self { db: Arc::new(db), _snapshot: Some(tmp) })
}
Err(e) => Err(e),
}
}
pub fn sink(&self) -> JobSink {
JobSink::redb(self.db.clone())
}
pub fn start(
&self,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> JobHandle {
JobHandle::start(self.sink(), kind, target, workspace, detail)
}
pub fn start_child(
&self,
parent_id: &str,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> JobHandle {
JobHandle::start_child(self.sink(), parent_id, kind, target, workspace, detail)
}
pub async fn start_scheduled(
&self,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> JobHandle {
JobHandle::start_scheduled(self.sink(), kind, target, workspace, detail).await
}
pub fn submit(&self, rec: &JobRecord) -> Result<()> {
redb_upsert(&self.db, rec)?;
if rec.is_terminal() {
redb_prune(&self.db, KEEP_DAYS, CAP)?;
}
Ok(())
}
pub fn list(&self, sel: &JobSelector) -> Result<Vec<JobRecord>> {
let r = self.db.begin_read()?;
let t = r.open_table(TABLE)?;
let mut out = Vec::new();
for row in t.iter()? {
let (_k, v) = row?;
let rec: JobRecord =
serde_json::from_slice(v.value()).context("decode job record")?;
if sel.matches(&rec) {
out.push(rec);
}
}
out.sort_by(|a, b| b.ts_start_micros.cmp(&a.ts_start_micros));
Ok(out)
}
pub fn prune(&self, keep_days: i64, cap: usize) -> Result<usize> {
redb_prune(&self.db, keep_days, cap)
}
}
fn redb_upsert(db: &Database, rec: &JobRecord) -> Result<()> {
let bytes = serde_json::to_vec(rec).context("encode job record")?;
let w = db.begin_write()?;
{
let mut t = w.open_table(TABLE)?;
t.insert(rec.job_id.as_str(), bytes.as_slice())?;
}
w.commit()?;
Ok(())
}
fn redb_prune(db: &Database, keep_days: i64, cap: usize) -> Result<usize> {
let all: Vec<JobRecord> = {
let r = db.begin_read()?;
let t = r.open_table(TABLE)?;
let mut v = Vec::new();
for row in t.iter()? {
let (_k, val) = row?;
v.push(serde_json::from_slice::<JobRecord>(val.value()).context("decode job record")?);
}
v
};
let cutoff = now_micros() - keep_days * 86_400 * 1_000_000;
let mut victims: Vec<String> = Vec::new();
for rec in &all {
if rec.is_terminal() && rec.ts_end_micros.unwrap_or(rec.ts_start_micros) < cutoff {
victims.push(rec.job_id.clone());
}
}
let surviving = all.len().saturating_sub(victims.len());
if surviving > cap {
let mut terminal_left: Vec<&JobRecord> = all
.iter()
.filter(|r| r.is_terminal() && !victims.contains(&r.job_id))
.collect();
terminal_left.sort_by(|a, b| a.ts_start_micros.cmp(&b.ts_start_micros)); let need = surviving - cap;
for rec in terminal_left.into_iter().take(need) {
victims.push(rec.job_id.clone());
}
}
if victims.is_empty() {
return Ok(0);
}
let w = db.begin_write()?;
{
let mut t = w.open_table(TABLE)?;
for id in &victims {
t.remove(id.as_str())?;
}
}
w.commit()?;
Ok(victims.len())
}
fn is_lock_error(err: &anyhow::Error) -> bool {
err.chain().any(|e| {
let m = e.to_string();
m.contains("Database already open") || m.contains("Cannot acquire lock")
})
}
fn copy_redb_consistent(live: &Path, dst: &Path) -> Result<()> {
const MAX_ATTEMPTS: usize = 64;
let len_of = |p: &Path| -> Result<u64> {
Ok(std::fs::metadata(p).with_context(|| format!("stat {}", p.display()))?.len())
};
for attempt in 0..MAX_ATTEMPTS {
let before = len_of(live)?;
std::fs::copy(live, dst)
.with_context(|| format!("copy {} -> {}", live.display(), dst.display()))?;
let after = len_of(live)?;
if before == after && len_of(dst)? >= after {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(2 + attempt as u64 / 4));
}
anyhow::bail!(
"snapshot of locked jobs db {} never reached a size-stable window after {MAX_ATTEMPTS} attempts",
live.display()
)
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContainerSpec {
pub engine: String,
pub image: String,
pub cmd: Vec<String>,
pub run_flags: Vec<String>,
}
impl ContainerSpec {
pub fn podman(image: &str, cmd: &[&str]) -> Self {
ContainerSpec {
engine: "podman".into(),
image: image.into(),
cmd: cmd.iter().map(|s| s.to_string()).collect(),
run_flags: vec!["--rm".into()],
}
}
pub fn argv(&self) -> Vec<String> {
let mut v = vec![self.engine.clone(), "run".into()];
v.extend(self.run_flags.iter().cloned());
v.push(self.image.clone());
v.extend(self.cmd.iter().cloned());
v
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ContainerOutcome {
pub exit_code: Option<i32>,
pub stdout: String,
pub stderr: String,
}
const LOG_CAP_BYTES: usize = 16 * 1024;
fn truncate_log(mut s: String) -> String {
if s.len() > LOG_CAP_BYTES {
s.truncate(LOG_CAP_BYTES);
s.push_str("\n…[truncated]");
}
s
}
pub fn run_container(
sink: JobSink,
spec: &ContainerSpec,
target: &str,
workspace: &str,
) -> Result<ContainerOutcome> {
let argv = spec.argv();
let job = JobHandle::start(
sink,
kind::CONTAINER,
target,
workspace,
serde_json::json!({ "argv": argv, "engine": spec.engine, "image": spec.image }),
);
let mut command = std::process::Command::new(&spec.engine);
command.arg("run");
command.args(&spec.run_flags);
command.arg(&spec.image);
command.args(&spec.cmd);
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
let child = match command.spawn() {
Ok(c) => c,
Err(e) => {
let err = anyhow::anyhow!(e).context(format!(
"spawn container engine `{}` (is it installed and on PATH?)",
spec.engine
));
job.fail(&err);
return Err(err);
}
};
let pid = child.id();
let output = match child.wait_with_output() {
Ok(o) => o,
Err(e) => {
let err = anyhow::anyhow!(e).context("wait for container");
job.fail(&err);
return Err(err);
}
};
let exit_code = output.status.code();
let outcome = ContainerOutcome {
exit_code,
stdout: truncate_log(String::from_utf8_lossy(&output.stdout).into_owned()),
stderr: truncate_log(String::from_utf8_lossy(&output.stderr).into_owned()),
};
let detail = serde_json::json!({
"argv": argv,
"pid": pid,
"exit_code": exit_code,
"stdout": outcome.stdout,
"stderr": outcome.stderr,
});
if output.status.success() {
job.finish(detail, "");
} else {
job.fail_with_detail(detail);
}
Ok(outcome)
}
#[cfg(test)]
mod tests {
use super::*;
fn tmpdir(tag: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!("nornir-jobs-{tag}-{}", std::process::id()))
}
#[test]
fn start_finish_fail_and_drop_zombie_guard() {
let dir = tmpdir("life");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let h = store.start(kind::BENCH_RUN, "facett", "nordisk", serde_json::json!({"n": 1}));
let id_done = h.job_id().to_string();
h.finish(serde_json::json!({"rows": 12}), "bench_runs:abc");
let h = store.start(kind::TEST_MATRIX, "holger", "nordisk", serde_json::Value::Null);
h.fail(&anyhow::anyhow!("boom"));
let id_drop = {
let h = store.start(kind::DOCS_BOOK_SVG, "nornir", "nordisk", serde_json::Value::Null);
let id = h.job_id().to_string();
drop(h);
id
};
let all = store.list(&JobSelector::All).unwrap();
assert_eq!(all.len(), 3, "three jobs recorded");
assert!(all.iter().all(|r| r.is_terminal()), "no zombie running rows");
let done = all.iter().find(|r| r.job_id == id_done).unwrap();
assert_eq!(done.status, status::DONE);
assert_eq!(done.result_ref, "bench_runs:abc");
assert!(done.elapsed_ms.is_some(), "finished job has elapsed");
assert!(done.ts_end_micros.is_some());
let dropped = all.iter().find(|r| r.job_id == id_drop).unwrap();
assert_eq!(dropped.status, status::FAILED, "dropped handle records failed");
assert!(dropped.detail_json.contains("dropped"), "drop reason carried");
assert_eq!(store.list(&JobSelector::Kind(kind::BENCH_RUN.into())).unwrap().len(), 1);
assert_eq!(store.list(&JobSelector::Workspace("nordisk".into())).unwrap().len(), 3);
assert_eq!(store.list(&JobSelector::Workspace("other".into())).unwrap().len(), 0);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn running_then_done_overwrites_in_place_no_duplicate() {
let dir = tmpdir("overwrite");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let h = store.start(kind::RELEASE_RUN, "ws", "ws", serde_json::Value::Null);
let id = h.job_id().to_string();
let running = store.list(&JobSelector::All).unwrap();
assert_eq!(running.len(), 1);
assert_eq!(running[0].status, status::RUNNING);
assert!(running[0].elapsed_ms.is_none());
h.finish(serde_json::Value::Null, "release_events:r1");
let done = store.list(&JobSelector::All).unwrap();
assert_eq!(done.len(), 1, "finish overwrote the running row, not appended");
assert_eq!(done[0].job_id, id);
assert_eq!(done[0].status, status::DONE);
assert_eq!(done[0].result_ref, "release_events:r1");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn prune_drops_aged_terminal_and_caps_count_but_keeps_running() {
let dir = tmpdir("prune");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let old_end = now_micros() - (KEEP_DAYS + 5) * 86_400 * 1_000_000;
redb_upsert(
&store.db,
&JobRecord {
job_id: "old".into(),
kind: kind::DOCS_BOOK.into(),
target: "t".into(),
workspace: "w".into(),
status: status::DONE.into(),
ts_start_micros: old_end - 1000,
ts_end_micros: Some(old_end),
elapsed_ms: Some(1),
detail_json: String::new(),
result_ref: String::new(),
parent_id: None,
},
)
.unwrap();
redb_upsert(
&store.db,
&JobRecord {
job_id: "live".into(),
kind: kind::BENCH_RUN.into(),
target: "t".into(),
workspace: "w".into(),
status: status::RUNNING.into(),
ts_start_micros: now_micros(),
ts_end_micros: None,
elapsed_ms: None,
detail_json: String::new(),
result_ref: String::new(),
parent_id: None,
},
)
.unwrap();
let removed = store.prune(KEEP_DAYS, CAP).unwrap();
assert_eq!(removed, 1, "only the aged terminal job is pruned");
let left = store.list(&JobSelector::All).unwrap();
assert_eq!(left.len(), 1);
assert_eq!(left[0].job_id, "live", "the running job survives retention");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn container_spec_argv_is_engine_run_flags_image_cmd() {
let spec = ContainerSpec::podman("alpine:3", &["echo", "hi"]);
assert_eq!(
spec.argv(),
vec!["podman", "run", "--rm", "alpine:3", "echo", "hi"]
);
let s = serde_json::to_string(&spec).unwrap();
assert_eq!(serde_json::from_str::<ContainerSpec>(&s).unwrap(), spec);
}
#[test]
fn container_job_missing_engine_records_failed_and_errs() {
let dir = tmpdir("container-missing");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let spec = ContainerSpec {
engine: "nornir-no-such-container-engine".into(),
image: "alpine".into(),
cmd: vec!["true".into()],
run_flags: vec!["--rm".into()],
};
let res = run_container(store.sink(), &spec, "alpine", "ws");
assert!(res.is_err(), "missing engine surfaces as Err");
let jobs = store.list(&JobSelector::Kind(kind::CONTAINER.into())).unwrap();
assert_eq!(jobs.len(), 1, "one container job recorded");
assert_eq!(jobs[0].status, status::FAILED, "spawn failure → failed row");
assert!(jobs[0].is_terminal());
assert!(jobs[0].detail_json.contains("error"), "error carried in detail");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn container_job_real_podman_echo_tracks_done() {
if std::process::Command::new("podman")
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| !s.success())
.unwrap_or(true)
{
eprintln!("skip: podman not available");
return;
}
let dir = tmpdir("container-podman");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let spec = ContainerSpec::podman(
"docker.io/library/alpine:3",
&["echo", "nornir-jobs-ok"],
);
let outcome = run_container(store.sink(), &spec, "alpine:3", "ws")
.expect("container ran");
assert_eq!(outcome.exit_code, Some(0), "echo exits 0");
assert!(
outcome.stdout.contains("nornir-jobs-ok"),
"container stdout captured: {:?}",
outcome.stdout
);
let jobs = store.list(&JobSelector::Kind(kind::CONTAINER.into())).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].status, status::DONE, "exit 0 → done");
assert!(jobs[0].elapsed_ms.is_some());
assert!(jobs[0].detail_json.contains("nornir-jobs-ok"), "logs on row");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn truncate_log_caps_and_marks() {
let big = "x".repeat(LOG_CAP_BYTES + 5000);
let out = truncate_log(big);
assert!(out.len() <= LOG_CAP_BYTES + 32, "capped near LOG_CAP_BYTES");
assert!(out.ends_with("…[truncated]"), "truncation is marked");
assert_eq!(truncate_log("ok".to_string()), "ok");
}
#[test]
fn container_job_nonzero_exit_records_failed_keeps_outcome() {
if std::process::Command::new("podman")
.arg("--version")
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| !s.success())
.unwrap_or(true)
{
eprintln!("skip: podman not available");
return;
}
let dir = tmpdir("container-nonzero");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let spec = ContainerSpec::podman(
"docker.io/library/alpine:3",
&["sh", "-c", "exit 7"],
);
let outcome = run_container(store.sink(), &spec, "alpine:3", "ws")
.expect("spawn/wait succeeded even though the container failed");
assert_eq!(outcome.exit_code, Some(7), "non-zero exit captured");
let jobs = store.list(&JobSelector::Kind(kind::CONTAINER.into())).unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].status, status::FAILED, "non-zero exit → failed row");
assert!(jobs[0].detail_json.contains("\"exit_code\":7"), "exit code on row: {}", jobs[0].detail_json);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn classify_bench_exclusive_others_parallel() {
assert!(is_exclusive(kind::BENCH_RUN), "bench is the heavy/serial kind");
assert_eq!(class_of(kind::BENCH_RUN), JobClass::Exclusive);
for k in [
kind::RELEASE_RUN,
kind::TEST_MATRIX,
kind::ARCH_GENERATE,
kind::COVERAGE,
kind::CONTAINER,
kind::DOCS_BOOK,
kind::WORKSPACE_FETCH,
] {
assert!(!is_exclusive(k), "{k} should be parallel");
assert_eq!(class_of(k), JobClass::Parallel, "{k}");
}
}
static SCHED_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exclusive_bench_jobs_serialize_second_observes_queued() {
let _guard = SCHED_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let dir = tmpdir("sched-serialize");
std::fs::remove_dir_all(&dir).ok();
let store = Arc::new(JobStore::open(&dir).unwrap());
let first = store
.start_scheduled(kind::BENCH_RUN, "alpha", "ws", serde_json::Value::Null)
.await;
let first_id = first.job_id().to_string();
assert_eq!(
store
.list(&JobSelector::Kind(kind::BENCH_RUN.into()))
.unwrap()
.iter()
.find(|r| r.job_id == first_id)
.unwrap()
.status,
status::RUNNING,
"first bench runs immediately"
);
let store2 = store.clone();
let second = tokio::spawn(async move {
let h = store2
.start_scheduled(kind::BENCH_RUN, "beta", "ws", serde_json::Value::Null)
.await;
let id = h.job_id().to_string();
h.finish(serde_json::Value::Null, "");
id
});
let second_id = poll_until(&store, |recs| {
recs.iter().find(|r| r.target == "beta").map(|r| {
assert_eq!(r.status, status::QUEUED, "second bench is queued behind the first");
r.job_id.clone()
})
})
.await;
let recs = store.list(&JobSelector::Kind(kind::BENCH_RUN.into())).unwrap();
let running: Vec<_> = recs.iter().filter(|r| r.status == status::RUNNING).collect();
assert_eq!(running.len(), 1, "exactly one bench runs at a time");
assert_eq!(running[0].job_id, first_id);
first.finish(serde_json::Value::Null, "");
let got_id = second.await.unwrap();
assert_eq!(got_id, second_id, "the queued bench is the one that ran");
let final_recs = store.list(&JobSelector::Kind(kind::BENCH_RUN.into())).unwrap();
for r in &final_recs {
assert_eq!(r.status, status::DONE, "both benches finished: {r:?}");
}
std::fs::remove_dir_all(&dir).ok();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn parallel_job_not_blocked_by_running_exclusive() {
let _guard = SCHED_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let dir = tmpdir("sched-parallel");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let bench = store
.start_scheduled(kind::BENCH_RUN, "alpha", "ws", serde_json::Value::Null)
.await;
let other = store
.start_scheduled(kind::RELEASE_RUN, "rel", "ws", serde_json::Value::Null)
.await;
let other_rec = store
.list(&JobSelector::Kind(kind::RELEASE_RUN.into()))
.unwrap()
.into_iter()
.next()
.unwrap();
assert_eq!(other_rec.status, status::RUNNING, "parallel job is never gated");
other.finish(serde_json::Value::Null, "");
bench.finish(serde_json::Value::Null, "");
std::fs::remove_dir_all(&dir).ok();
}
async fn poll_until<T>(
store: &JobStore,
pick: impl Fn(&[JobRecord]) -> Option<T>,
) -> T {
for _ in 0..500 {
let recs = store.list(&JobSelector::All).unwrap();
if let Some(v) = pick(&recs) {
return v;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
panic!("condition not reached within timeout");
}
#[test]
fn job_record_vec_json_round_trips() {
let recs = vec![JobRecord {
job_id: "j".into(),
kind: kind::ARCH_GENERATE.into(),
target: "skade".into(),
workspace: "nordisk".into(),
status: status::DONE.into(),
ts_start_micros: 1,
ts_end_micros: Some(2),
elapsed_ms: Some(0),
detail_json: "{\"nodes\":310}".into(),
result_ref: "architecture_wiring:xyz".into(),
parent_id: None,
}];
let s = serde_json::to_string(&recs).unwrap();
let back: Vec<JobRecord> = serde_json::from_str(&s).unwrap();
assert_eq!(recs, back);
}
#[test]
fn child_jobs_carry_parent_id_and_persist() {
let dir = tmpdir("hierarchy");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let parent = store.start(
kind::WORKSPACE_POPULATE,
"nordisk",
"nordisk",
serde_json::json!({ "phase": "populate" }),
);
let pid = parent.job_id().to_string();
let c1 = store.start_child(&pid, kind::WORKSPACE_CLONE, "alpha", "nordisk", serde_json::Value::Null);
let c1_id = c1.job_id().to_string();
c1.finish(serde_json::json!({ "sha": "abc" }), "");
let c2 = store.start_child(&pid, kind::WORKSPACE_CLONE, "beta", "nordisk", serde_json::Value::Null);
c2.finish(serde_json::json!({ "sha": "def" }), "");
let build = store.start_child(&pid, kind::WORKSPACE_REPUBLISH, "nordisk", "nordisk", serde_json::Value::Null);
build.finish(serde_json::json!({ "snapshot": "snap-1" }), "");
parent.finish(serde_json::json!({ "members": 2 }), "");
let all = store.list(&JobSelector::Workspace("nordisk".into())).unwrap();
assert_eq!(all.len(), 4, "parent + 2 clone children + 1 build child");
let p = all.iter().find(|r| r.job_id == pid).unwrap();
assert_eq!(p.parent_id, None, "the populate parent has no parent");
assert_eq!(p.kind, kind::WORKSPACE_POPULATE);
let children: Vec<&JobRecord> = all.iter().filter(|r| r.parent_id.as_deref() == Some(pid.as_str())).collect();
assert_eq!(children.len(), 3, "two clones + one build are children of the populate");
assert!(children.iter().any(|r| r.job_id == c1_id && r.kind == kind::WORKSPACE_CLONE));
assert!(children.iter().all(|r| r.is_terminal()), "all children terminal");
std::fs::remove_dir_all(&dir).ok();
}
}