use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result};
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
const TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("jobs");
pub const KEEP_DAYS: i64 = 30;
pub const CAP: usize = 2000;
pub mod status {
pub const RUNNING: &str = "running";
pub const DONE: &str = "done";
pub const FAILED: &str = "failed";
pub fn is_terminal(s: &str) -> bool {
s == DONE || s == FAILED
}
}
pub mod kind {
pub const RELEASE_RUN: &str = "release_run";
pub const WORKSPACE_FETCH: &str = "workspace_fetch";
pub const WORKSPACE_REPUBLISH: &str = "workspace_republish";
pub const DOCS_RENDER: &str = "docs_render";
pub const DOCS_EXPORT: &str = "docs_export";
pub const DOCS_BOOK: &str = "docs_book";
pub const DOCS_BOOK_SVG: &str = "docs_book_svg";
pub const KNOWLEDGE_SCAN: &str = "knowledge_scan";
pub const DEEPSCAN: &str = "deepscan";
pub const BENCH_RUN: &str = "bench_run";
pub const TEST_MATRIX: &str = "test_matrix";
pub const ARCH_GENERATE: &str = "arch_generate";
pub const INDEX_BUILD: &str = "index_build";
pub const VECTOR_EMBED: &str = "vector_embed";
pub const BAKEOFF: &str = "bakeoff";
pub const AIRGAP: &str = "airgap";
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct JobRecord {
pub job_id: String,
pub kind: String,
pub target: String,
pub workspace: String,
pub status: String,
pub ts_start_micros: i64,
pub ts_end_micros: Option<i64>,
pub elapsed_ms: Option<i64>,
pub detail_json: String,
pub result_ref: String,
}
impl JobRecord {
pub fn is_terminal(&self) -> bool {
status::is_terminal(&self.status)
}
}
#[derive(Debug, Clone)]
pub enum JobSelector {
All,
Workspace(String),
Kind(String),
}
impl JobSelector {
fn matches(&self, r: &JobRecord) -> bool {
match self {
JobSelector::All => true,
JobSelector::Workspace(w) => &r.workspace == w,
JobSelector::Kind(k) => &r.kind == k,
}
}
}
fn now_micros() -> i64 {
chrono::Utc::now().timestamp_micros()
}
#[derive(Clone)]
pub struct JobSink(Arc<dyn Fn(&JobRecord) + Send + Sync>);
impl JobSink {
pub fn redb(db: Arc<Database>) -> Self {
JobSink(Arc::new(move |rec: &JobRecord| {
if let Err(e) = redb_upsert(&db, rec) {
eprintln!(" ⚠ jobs: dropped {} row for {}/{} (non-fatal): {e:#}", rec.status, rec.kind, rec.target);
return;
}
if rec.is_terminal() {
if let Err(e) = redb_prune(&db, KEEP_DAYS, CAP) {
eprintln!(" ⚠ jobs: prune failed (non-fatal): {e:#}");
}
}
}))
}
pub fn remote(f: impl Fn(&JobRecord) + Send + Sync + 'static) -> Self {
JobSink(Arc::new(f))
}
pub fn noop() -> Self {
JobSink(Arc::new(|_| {}))
}
fn upsert(&self, rec: &JobRecord) {
(self.0)(rec)
}
}
pub struct JobHandle {
sink: JobSink,
rec: JobRecord,
finished: bool,
}
impl JobHandle {
pub fn start(
sink: JobSink,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> Self {
let rec = JobRecord {
job_id: uuid::Uuid::new_v4().to_string(),
kind: kind.to_string(),
target: target.to_string(),
workspace: workspace.to_string(),
status: status::RUNNING.to_string(),
ts_start_micros: now_micros(),
ts_end_micros: None,
elapsed_ms: None,
detail_json: detail_to_string(detail),
result_ref: String::new(),
};
sink.upsert(&rec);
JobHandle { sink, rec, finished: false }
}
pub fn job_id(&self) -> &str {
&self.rec.job_id
}
pub fn finish(mut self, detail: serde_json::Value, result_ref: &str) {
self.terminate(status::DONE, detail_to_string(detail), result_ref.to_string());
}
pub fn fail(mut self, err: &anyhow::Error) {
let detail = serde_json::json!({ "error": format!("{err:#}") }).to_string();
self.terminate(status::FAILED, detail, String::new());
}
fn terminate(&mut self, st: &str, detail_json: String, result_ref: String) {
let end = now_micros();
self.rec.status = st.to_string();
self.rec.ts_end_micros = Some(end);
self.rec.elapsed_ms = Some((end - self.rec.ts_start_micros) / 1000);
self.rec.detail_json = detail_json;
self.rec.result_ref = result_ref;
self.sink.upsert(&self.rec);
self.finished = true;
}
}
impl Drop for JobHandle {
fn drop(&mut self) {
if !self.finished {
let detail =
serde_json::json!({ "error": "job dropped without finish/fail (panic or early return)" })
.to_string();
self.terminate(status::FAILED, detail, String::new());
}
}
}
fn detail_to_string(detail: serde_json::Value) -> String {
if detail.is_null() {
String::new()
} else {
detail.to_string()
}
}
pub struct JobStore {
db: Arc<Database>,
_snapshot: Option<tempfile::TempDir>,
}
impl JobStore {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create jobs root {}", root.display()))?;
let path = root.join("jobs.redb");
let db = Database::create(&path).with_context(|| format!("open {}", path.display()))?;
let w = db.begin_write()?;
{
let _ = w.open_table(TABLE)?;
}
w.commit()?;
Ok(Self { db: Arc::new(db), _snapshot: None })
}
pub fn open_read_only(root: &Path) -> Result<Self> {
match Self::open(root) {
Ok(s) => Ok(s),
Err(e) if is_lock_error(&e) => {
let live = root.join("jobs.redb");
if !live.exists() {
return Err(e);
}
let tmp = tempfile::Builder::new()
.prefix("nornir-jobs-snapshot-")
.tempdir()
.context("create temp dir for jobs snapshot")?;
let snap = tmp.path().join("jobs.redb");
copy_redb_consistent(&live, &snap).with_context(|| {
format!("copy locked jobs {} -> snapshot {}", live.display(), snap.display())
})?;
let db = Database::create(&snap)
.with_context(|| format!("open jobs snapshot {}", snap.display()))?;
Ok(Self { db: Arc::new(db), _snapshot: Some(tmp) })
}
Err(e) => Err(e),
}
}
pub fn sink(&self) -> JobSink {
JobSink::redb(self.db.clone())
}
pub fn start(
&self,
kind: &str,
target: &str,
workspace: &str,
detail: serde_json::Value,
) -> JobHandle {
JobHandle::start(self.sink(), kind, target, workspace, detail)
}
pub fn submit(&self, rec: &JobRecord) -> Result<()> {
redb_upsert(&self.db, rec)?;
if rec.is_terminal() {
redb_prune(&self.db, KEEP_DAYS, CAP)?;
}
Ok(())
}
pub fn list(&self, sel: &JobSelector) -> Result<Vec<JobRecord>> {
let r = self.db.begin_read()?;
let t = r.open_table(TABLE)?;
let mut out = Vec::new();
for row in t.iter()? {
let (_k, v) = row?;
let rec: JobRecord =
serde_json::from_slice(v.value()).context("decode job record")?;
if sel.matches(&rec) {
out.push(rec);
}
}
out.sort_by(|a, b| b.ts_start_micros.cmp(&a.ts_start_micros));
Ok(out)
}
pub fn prune(&self, keep_days: i64, cap: usize) -> Result<usize> {
redb_prune(&self.db, keep_days, cap)
}
}
fn redb_upsert(db: &Database, rec: &JobRecord) -> Result<()> {
let bytes = serde_json::to_vec(rec).context("encode job record")?;
let w = db.begin_write()?;
{
let mut t = w.open_table(TABLE)?;
t.insert(rec.job_id.as_str(), bytes.as_slice())?;
}
w.commit()?;
Ok(())
}
fn redb_prune(db: &Database, keep_days: i64, cap: usize) -> Result<usize> {
let all: Vec<JobRecord> = {
let r = db.begin_read()?;
let t = r.open_table(TABLE)?;
let mut v = Vec::new();
for row in t.iter()? {
let (_k, val) = row?;
v.push(serde_json::from_slice::<JobRecord>(val.value()).context("decode job record")?);
}
v
};
let cutoff = now_micros() - keep_days * 86_400 * 1_000_000;
let mut victims: Vec<String> = Vec::new();
for rec in &all {
if rec.is_terminal() && rec.ts_end_micros.unwrap_or(rec.ts_start_micros) < cutoff {
victims.push(rec.job_id.clone());
}
}
let surviving = all.len().saturating_sub(victims.len());
if surviving > cap {
let mut terminal_left: Vec<&JobRecord> = all
.iter()
.filter(|r| r.is_terminal() && !victims.contains(&r.job_id))
.collect();
terminal_left.sort_by(|a, b| a.ts_start_micros.cmp(&b.ts_start_micros)); let need = surviving - cap;
for rec in terminal_left.into_iter().take(need) {
victims.push(rec.job_id.clone());
}
}
if victims.is_empty() {
return Ok(0);
}
let w = db.begin_write()?;
{
let mut t = w.open_table(TABLE)?;
for id in &victims {
t.remove(id.as_str())?;
}
}
w.commit()?;
Ok(victims.len())
}
fn is_lock_error(err: &anyhow::Error) -> bool {
err.chain().any(|e| {
let m = e.to_string();
m.contains("Database already open") || m.contains("Cannot acquire lock")
})
}
fn copy_redb_consistent(live: &Path, dst: &Path) -> Result<()> {
const MAX_ATTEMPTS: usize = 64;
let len_of = |p: &Path| -> Result<u64> {
Ok(std::fs::metadata(p).with_context(|| format!("stat {}", p.display()))?.len())
};
for attempt in 0..MAX_ATTEMPTS {
let before = len_of(live)?;
std::fs::copy(live, dst)
.with_context(|| format!("copy {} -> {}", live.display(), dst.display()))?;
let after = len_of(live)?;
if before == after && len_of(dst)? >= after {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(2 + attempt as u64 / 4));
}
anyhow::bail!(
"snapshot of locked jobs db {} never reached a size-stable window after {MAX_ATTEMPTS} attempts",
live.display()
)
}
#[cfg(test)]
mod tests {
use super::*;
fn tmpdir(tag: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!("nornir-jobs-{tag}-{}", std::process::id()))
}
#[test]
fn start_finish_fail_and_drop_zombie_guard() {
let dir = tmpdir("life");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let h = store.start(kind::BENCH_RUN, "facett", "nordisk", serde_json::json!({"n": 1}));
let id_done = h.job_id().to_string();
h.finish(serde_json::json!({"rows": 12}), "bench_runs:abc");
let h = store.start(kind::TEST_MATRIX, "holger", "nordisk", serde_json::Value::Null);
h.fail(&anyhow::anyhow!("boom"));
let id_drop = {
let h = store.start(kind::DOCS_BOOK_SVG, "nornir", "nordisk", serde_json::Value::Null);
let id = h.job_id().to_string();
drop(h);
id
};
let all = store.list(&JobSelector::All).unwrap();
assert_eq!(all.len(), 3, "three jobs recorded");
assert!(all.iter().all(|r| r.is_terminal()), "no zombie running rows");
let done = all.iter().find(|r| r.job_id == id_done).unwrap();
assert_eq!(done.status, status::DONE);
assert_eq!(done.result_ref, "bench_runs:abc");
assert!(done.elapsed_ms.is_some(), "finished job has elapsed");
assert!(done.ts_end_micros.is_some());
let dropped = all.iter().find(|r| r.job_id == id_drop).unwrap();
assert_eq!(dropped.status, status::FAILED, "dropped handle records failed");
assert!(dropped.detail_json.contains("dropped"), "drop reason carried");
assert_eq!(store.list(&JobSelector::Kind(kind::BENCH_RUN.into())).unwrap().len(), 1);
assert_eq!(store.list(&JobSelector::Workspace("nordisk".into())).unwrap().len(), 3);
assert_eq!(store.list(&JobSelector::Workspace("other".into())).unwrap().len(), 0);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn running_then_done_overwrites_in_place_no_duplicate() {
let dir = tmpdir("overwrite");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let h = store.start(kind::RELEASE_RUN, "ws", "ws", serde_json::Value::Null);
let id = h.job_id().to_string();
let running = store.list(&JobSelector::All).unwrap();
assert_eq!(running.len(), 1);
assert_eq!(running[0].status, status::RUNNING);
assert!(running[0].elapsed_ms.is_none());
h.finish(serde_json::Value::Null, "release_events:r1");
let done = store.list(&JobSelector::All).unwrap();
assert_eq!(done.len(), 1, "finish overwrote the running row, not appended");
assert_eq!(done[0].job_id, id);
assert_eq!(done[0].status, status::DONE);
assert_eq!(done[0].result_ref, "release_events:r1");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn prune_drops_aged_terminal_and_caps_count_but_keeps_running() {
let dir = tmpdir("prune");
std::fs::remove_dir_all(&dir).ok();
let store = JobStore::open(&dir).unwrap();
let old_end = now_micros() - (KEEP_DAYS + 5) * 86_400 * 1_000_000;
redb_upsert(
&store.db,
&JobRecord {
job_id: "old".into(),
kind: kind::DOCS_BOOK.into(),
target: "t".into(),
workspace: "w".into(),
status: status::DONE.into(),
ts_start_micros: old_end - 1000,
ts_end_micros: Some(old_end),
elapsed_ms: Some(1),
detail_json: String::new(),
result_ref: String::new(),
},
)
.unwrap();
redb_upsert(
&store.db,
&JobRecord {
job_id: "live".into(),
kind: kind::BENCH_RUN.into(),
target: "t".into(),
workspace: "w".into(),
status: status::RUNNING.into(),
ts_start_micros: now_micros(),
ts_end_micros: None,
elapsed_ms: None,
detail_json: String::new(),
result_ref: String::new(),
},
)
.unwrap();
let removed = store.prune(KEEP_DAYS, CAP).unwrap();
assert_eq!(removed, 1, "only the aged terminal job is pruned");
let left = store.list(&JobSelector::All).unwrap();
assert_eq!(left.len(), 1);
assert_eq!(left[0].job_id, "live", "the running job survives retention");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn job_record_vec_json_round_trips() {
let recs = vec![JobRecord {
job_id: "j".into(),
kind: kind::ARCH_GENERATE.into(),
target: "skade".into(),
workspace: "nordisk".into(),
status: status::DONE.into(),
ts_start_micros: 1,
ts_end_micros: Some(2),
elapsed_ms: Some(0),
detail_json: "{\"nodes\":310}".into(),
result_ref: "architecture_wiring:xyz".into(),
}];
let s = serde_json::to_string(&recs).unwrap();
let back: Vec<JobRecord> = serde_json::from_str(&s).unwrap();
assert_eq!(recs, back);
}
}