mod error;
pub mod postgres;
mod types;
pub use error::JobsError;
pub use postgres::PostgresJobsStore;
pub use types::{FailedJob, Job, JobKind, JobState};
use std::future::Future;
pub trait MemoryJobsStore: Send + Sync + 'static {
fn enqueue(
&self,
kind: JobKind,
source_pid: String,
payload: serde_json::Value,
) -> impl Future<Output = Result<i64, JobsError>> + Send;
fn enqueue_synthesis_if_ready(
&self,
source_pid: &str,
caller_job_id: i64,
) -> impl Future<Output = Result<bool, JobsError>> + Send;
fn claim(&self, claimed_by: Option<&str>) -> impl Future<Output = Result<Option<Job>, JobsError>> + Send;
fn complete(&self, id: i64) -> impl Future<Output = Result<(), JobsError>> + Send;
fn fail(&self, id: i64, reason: String, max_attempts: i32) -> impl Future<Output = Result<(), JobsError>> + Send;
fn reset_expired_leases(&self, lease: std::time::Duration) -> impl Future<Output = Result<u64, JobsError>> + Send;
fn list_failed(&self, limit: usize) -> impl Future<Output = Result<Vec<FailedJob>, JobsError>> + Send;
fn retry_job(&self, id: i64) -> impl Future<Output = Result<(), JobsError>> + Send;
fn bulk_retry(&self, kind: Option<JobKind>, dry_run: bool) -> impl Future<Output = Result<u64, JobsError>> + Send;
fn delete_failed(&self, id: i64) -> impl Future<Output = Result<(), JobsError>> + Send;
fn pending_count(&self) -> impl Future<Output = Result<u64, JobsError>> + Send;
}
const fn assert_send<T: Send>() {}
const _: () = {
assert_send::<Job>();
assert_send::<FailedJob>();
assert_send::<JobKind>();
assert_send::<JobState>();
assert_send::<JobsError>();
assert_send::<PostgresJobsStore>();
};
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use std::sync::Mutex;
use std::time::Duration;
#[derive(Default)]
struct StubJobsStore {
rows: Mutex<Vec<Job>>,
next_id: Mutex<i64>,
}
impl StubJobsStore {
fn alloc_id(&self) -> i64 {
let mut guard = self.next_id.lock().unwrap();
*guard += 1;
*guard
}
}
impl MemoryJobsStore for StubJobsStore {
async fn enqueue(
&self,
kind: JobKind,
source_pid: String,
payload: serde_json::Value,
) -> Result<i64, JobsError> {
let id = self.alloc_id();
let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
let job = Job {
id,
source_pid,
kind,
state: JobState::Pending,
payload,
attempts: 0,
failure_reason: None,
claimed_at: None,
claimed_by: None,
created_at: now,
updated_at: now,
};
self.rows.lock().unwrap().push(job);
Ok(id)
}
async fn enqueue_synthesis_if_ready(&self, source_pid: &str, caller_job_id: i64) -> Result<bool, JobsError> {
let mut rows = self.rows.lock().unwrap();
let blocked = rows.iter().any(|r| {
r.id != caller_job_id
&& r.source_pid == source_pid
&& matches!(
r.kind,
JobKind::Extract | JobKind::RelationalExtract | JobKind::Synthesize
)
});
if blocked {
return Ok(false);
}
let id = {
let mut guard = self.next_id.lock().unwrap();
*guard += 1;
*guard
};
let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
rows.push(Job {
id,
source_pid: source_pid.to_owned(),
kind: JobKind::Synthesize,
state: JobState::Pending,
payload: serde_json::json!({}),
attempts: 0,
failure_reason: None,
claimed_at: None,
claimed_by: None,
created_at: now,
updated_at: now,
});
Ok(true)
}
async fn claim(&self, claimed_by: Option<&str>) -> Result<Option<Job>, JobsError> {
let mut rows = self.rows.lock().unwrap();
let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
let Some(row) = rows.iter_mut().find(|r| r.state == JobState::Pending) else {
return Ok(None);
};
row.state = JobState::Claimed;
row.claimed_at = Some(now);
row.claimed_by = claimed_by.map(str::to_owned);
row.updated_at = now;
Ok(Some(row.clone()))
}
async fn complete(&self, id: i64) -> Result<(), JobsError> {
let mut rows = self.rows.lock().unwrap();
let before = rows.len();
rows.retain(|r| !(r.id == id && r.state == JobState::Claimed));
if rows.len() == before {
return Err(JobsError::NotFound(id.to_string()));
}
Ok(())
}
async fn fail(&self, id: i64, reason: String, max_attempts: i32) -> Result<(), JobsError> {
let mut rows = self.rows.lock().unwrap();
let Some(row) = rows.iter_mut().find(|r| r.id == id && r.state == JobState::Claimed) else {
return Err(JobsError::NotFound(id.to_string()));
};
row.attempts += 1;
row.failure_reason = Some(reason);
row.claimed_at = None;
row.claimed_by = None;
row.state = if row.attempts >= max_attempts {
JobState::Failed
} else {
JobState::Pending
};
row.updated_at = Utc::now().into();
Ok(())
}
async fn reset_expired_leases(&self, lease: Duration) -> Result<u64, JobsError> {
let mut rows = self.rows.lock().unwrap();
let now = Utc::now();
let cutoff = now - chrono::Duration::from_std(lease).unwrap_or_default();
let mut recovered = 0u64;
for row in rows.iter_mut() {
if row.state != JobState::Claimed {
continue;
}
let Some(claimed_at) = row.claimed_at else { continue };
if claimed_at < cutoff {
row.state = JobState::Pending;
row.claimed_at = None;
row.claimed_by = None;
row.updated_at = now.into();
recovered += 1;
}
}
Ok(recovered)
}
async fn list_failed(&self, limit: usize) -> Result<Vec<FailedJob>, JobsError> {
let rows = self.rows.lock().unwrap();
let mut out: Vec<FailedJob> = rows
.iter()
.filter(|r| r.state == JobState::Failed)
.map(|r| FailedJob {
id: r.id,
source_pid: r.source_pid.clone(),
kind: r.kind,
attempts: r.attempts,
failure_reason: r.failure_reason.clone(),
updated_at: r.updated_at,
})
.collect();
out.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
out.truncate(limit);
Ok(out)
}
async fn retry_job(&self, id: i64) -> Result<(), JobsError> {
let mut rows = self.rows.lock().unwrap();
let Some(row) = rows.iter_mut().find(|r| r.id == id && r.state == JobState::Failed) else {
return Err(JobsError::NotFound(id.to_string()));
};
row.state = JobState::Pending;
row.attempts = 0;
row.failure_reason = None;
row.claimed_at = None;
row.claimed_by = None;
row.updated_at = Utc::now().into();
Ok(())
}
async fn bulk_retry(&self, kind: Option<JobKind>, dry_run: bool) -> Result<u64, JobsError> {
let mut rows = self.rows.lock().unwrap();
let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
let mut affected = 0u64;
for row in rows.iter_mut() {
if row.state != JobState::Failed {
continue;
}
if let Some(k) = kind
&& row.kind != k
{
continue;
}
affected += 1;
if dry_run {
continue;
}
row.state = JobState::Pending;
row.attempts = 0;
row.failure_reason = None;
row.claimed_at = None;
row.claimed_by = None;
row.updated_at = now;
}
Ok(affected)
}
async fn delete_failed(&self, id: i64) -> Result<(), JobsError> {
let mut rows = self.rows.lock().unwrap();
let before = rows.len();
rows.retain(|r| !(r.id == id && r.state == JobState::Failed));
if rows.len() == before {
return Err(JobsError::NotFound(id.to_string()));
}
Ok(())
}
async fn pending_count(&self) -> Result<u64, JobsError> {
let rows = self.rows.lock().unwrap();
Ok(rows.iter().filter(|r| r.state == JobState::Pending).count() as u64)
}
}
#[tokio::test(flavor = "current_thread")]
async fn should_enqueue_then_claim_the_same_row() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let claimed = store.claim(None).await.unwrap().expect("row to claim");
assert_eq!(claimed.id, id);
assert_eq!(claimed.state, JobState::Claimed);
}
#[tokio::test(flavor = "current_thread")]
async fn should_return_none_when_queue_is_empty() {
let store = StubJobsStore::default();
assert!(store.claim(None).await.unwrap().is_none());
}
#[tokio::test(flavor = "current_thread")]
async fn should_complete_remove_the_row() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Extract, "pid_y".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.complete(id).await.unwrap();
assert!(store.claim(None).await.unwrap().is_none());
}
#[tokio::test(flavor = "current_thread")]
async fn should_fail_bump_attempts_and_return_to_pending_when_under_max() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid_z".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 3).await.unwrap();
let next = store.claim(None).await.unwrap().expect("row should be claimable again");
assert_eq!(next.attempts, 1);
assert_eq!(next.failure_reason.as_deref(), Some("boom"));
}
#[tokio::test(flavor = "current_thread")]
async fn should_fail_terminal_at_max_attempts() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid_w".to_string(), serde_json::json!({}))
.await
.unwrap();
for _ in 0..3 {
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 3).await.unwrap();
}
assert!(store.claim(None).await.unwrap().is_none());
}
#[tokio::test(flavor = "current_thread")]
async fn should_list_failed_return_only_failed_rows_newest_first() {
let store = StubJobsStore::default();
let id_a = store
.enqueue(JobKind::Extract, "pid_a".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id_a, "boom-a".to_string(), 1).await.unwrap();
let id_b = store
.enqueue(JobKind::Extract, "pid_b".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id_b, "boom-b".to_string(), 1).await.unwrap();
let _ = store
.enqueue(JobKind::Embed, "pid_pending".to_string(), serde_json::json!({}))
.await
.unwrap();
let failed = store.list_failed(10).await.unwrap();
assert_eq!(failed.len(), 2);
assert_eq!(failed[0].id, id_b);
assert_eq!(failed[0].source_pid, "pid_b");
assert_eq!(failed[0].failure_reason.as_deref(), Some("boom-b"));
}
#[tokio::test(flavor = "current_thread")]
async fn should_list_failed_respect_limit() {
let store = StubJobsStore::default();
for i in 0..3 {
let id = store
.enqueue(JobKind::Embed, format!("pid_{i}"), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 1).await.unwrap();
}
let failed = store.list_failed(2).await.unwrap();
assert_eq!(failed.len(), 2);
}
#[tokio::test(flavor = "current_thread")]
async fn should_retry_job_clear_attempts_and_pend() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 1).await.unwrap();
store.retry_job(id).await.unwrap();
let claimed = store
.claim(None)
.await
.unwrap()
.expect("retried job should be claimable");
assert_eq!(claimed.id, id);
assert_eq!(claimed.attempts, 0, "retry resets attempts to zero");
assert!(claimed.failure_reason.is_none(), "retry clears failure reason");
}
#[tokio::test(flavor = "current_thread")]
async fn should_retry_job_return_not_found_when_id_missing() {
let store = StubJobsStore::default();
let err = store.retry_job(999).await.unwrap_err();
assert!(matches!(err, JobsError::NotFound(_)));
}
#[tokio::test(flavor = "current_thread")]
async fn should_bulk_retry_with_kind_filter() {
let store = StubJobsStore::default();
let id_e = store
.enqueue(JobKind::Embed, "pid_e".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id_e, "boom".to_string(), 1).await.unwrap();
let id_x = store
.enqueue(JobKind::Extract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id_x, "boom".to_string(), 1).await.unwrap();
let affected = store.bulk_retry(Some(JobKind::Extract), false).await.unwrap();
assert_eq!(affected, 1, "only the extract row should be affected");
let failed = store.list_failed(10).await.unwrap();
assert_eq!(failed.len(), 1);
assert_eq!(failed[0].id, id_e);
}
#[tokio::test(flavor = "current_thread")]
async fn should_bulk_retry_dry_run_count_without_modifying() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 1).await.unwrap();
let affected = store.bulk_retry(None, true).await.unwrap();
assert_eq!(affected, 1, "dry_run should still report the count");
let failed = store.list_failed(10).await.unwrap();
assert_eq!(failed.len(), 1, "dry_run must NOT modify rows");
}
#[tokio::test(flavor = "current_thread")]
async fn should_delete_failed_remove_row() {
let store = StubJobsStore::default();
let id = store
.enqueue(JobKind::Embed, "pid".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap();
store.fail(id, "boom".to_string(), 1).await.unwrap();
store.delete_failed(id).await.unwrap();
let failed = store.list_failed(10).await.unwrap();
assert!(failed.is_empty());
}
#[tokio::test(flavor = "current_thread")]
async fn should_delete_failed_return_not_found_when_id_missing() {
let store = StubJobsStore::default();
let err = store.delete_failed(999).await.unwrap_err();
assert!(matches!(err, JobsError::NotFound(_)));
}
#[tokio::test(flavor = "current_thread")]
async fn should_pending_count_reflect_queue_state() {
let store = StubJobsStore::default();
assert_eq!(store.pending_count().await.unwrap(), 0);
let _ = store
.enqueue(JobKind::Embed, "pid_a".to_string(), serde_json::json!({}))
.await
.unwrap();
let _ = store
.enqueue(JobKind::Embed, "pid_b".to_string(), serde_json::json!({}))
.await
.unwrap();
assert_eq!(store.pending_count().await.unwrap(), 2);
store.claim(None).await.unwrap();
assert_eq!(store.pending_count().await.unwrap(), 1);
}
const NO_CALLER: i64 = 0;
#[tokio::test(flavor = "current_thread")]
async fn should_enqueue_synthesis_when_no_siblings_remain() {
let store = StubJobsStore::default();
let inserted = store.enqueue_synthesis_if_ready("pid_done", NO_CALLER).await.unwrap();
assert!(inserted, "no sibling rows -> synthesis fires");
let claimed = store.claim(None).await.unwrap().expect("synthesize row");
assert_eq!(claimed.kind, JobKind::Synthesize);
}
#[tokio::test(flavor = "current_thread")]
async fn should_enqueue_synthesis_when_only_the_callers_own_row_remains() {
let store = StubJobsStore::default();
let caller_id = store
.enqueue(JobKind::RelationalExtract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", caller_id).await.unwrap();
assert!(inserted, "only the caller's own row remains -> synthesis fires");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_enqueue_synthesis_while_extract_sibling_present() {
let store = StubJobsStore::default();
store
.enqueue(JobKind::Extract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
assert!(!inserted, "extract sibling still present -> no synthesis");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_enqueue_synthesis_while_relational_sibling_present() {
let store = StubJobsStore::default();
store
.enqueue(JobKind::RelationalExtract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
assert!(!inserted, "relational sibling still present -> no synthesis");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_enqueue_synthesis_while_a_sibling_is_terminally_failed() {
let store = StubJobsStore::default();
let relational = store
.enqueue(JobKind::RelationalExtract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
store.claim(None).await.unwrap().expect("the relational row is claimable");
store.fail(relational, "boom".to_string(), 1).await.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
assert!(!inserted, "a terminally-failed sibling still blocks synthesis");
}
#[tokio::test(flavor = "current_thread")]
async fn should_not_enqueue_synthesis_when_a_real_sibling_outlives_the_caller() {
let store = StubJobsStore::default();
let caller_id = store
.enqueue(JobKind::RelationalExtract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
store
.enqueue(JobKind::Extract, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", caller_id).await.unwrap();
assert!(!inserted, "a real other sibling outlives the caller -> no synthesis");
}
#[tokio::test(flavor = "current_thread")]
async fn should_enqueue_synthesis_exactly_once_regardless_of_order() {
let store = StubJobsStore::default();
let first = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
let second = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
assert!(first, "first call inserts");
assert!(!second, "second call sees the existing synthesize row -> no duplicate");
}
#[tokio::test(flavor = "current_thread")]
async fn should_ignore_unrelated_kinds_when_checking_readiness() {
let store = StubJobsStore::default();
store
.enqueue(JobKind::Embed, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
store
.enqueue(JobKind::Categorize, "pid_x".to_string(), serde_json::json!({}))
.await
.unwrap();
let inserted = store.enqueue_synthesis_if_ready("pid_x", NO_CALLER).await.unwrap();
assert!(inserted, "embed/categorize are not synthesis parents -> synthesis fires");
}
#[tokio::test(flavor = "current_thread")]
async fn should_reset_expired_leases_re_pend_old_claims() {
let store = StubJobsStore::default();
let _ = store
.enqueue(JobKind::Embed, "pid_a".to_string(), serde_json::json!({}))
.await
.unwrap();
let _ = store.claim(None).await.unwrap();
{
let mut rows = store.rows.lock().unwrap();
let claimed_at: chrono::DateTime<chrono::FixedOffset> =
(Utc::now() - chrono::Duration::seconds(120)).into();
rows[0].claimed_at = Some(claimed_at);
}
let recovered = store.reset_expired_leases(Duration::from_secs(30)).await.unwrap();
assert_eq!(recovered, 1);
let next = store.claim(None).await.unwrap().expect("row should be claimable again");
assert_eq!(next.state, JobState::Claimed);
}
}