use std::time::SystemTime;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid7::Uuid;
use crate::commit::TenantId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct JobId(pub Uuid);
impl JobId {
pub fn new_random() -> Self {
Self(uuid7::uuid7())
}
}
impl std::fmt::Display for JobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct LeaseId(pub Uuid);
impl LeaseId {
pub fn new_random() -> Self {
Self(uuid7::uuid7())
}
}
impl std::fmt::Display for LeaseId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct WorkerId(pub String);
impl WorkerId {
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum JobState {
Pending,
Leased,
Succeeded,
Failed,
Cancelled,
}
impl JobState {
pub fn as_str(&self) -> &'static str {
match self {
JobState::Pending => "Pending",
JobState::Leased => "Leased",
JobState::Succeeded => "Succeeded",
JobState::Failed => "Failed",
JobState::Cancelled => "Cancelled",
}
}
pub fn from_str(s: &str) -> Option<JobState> {
Some(match s {
"Pending" => JobState::Pending,
"Leased" => JobState::Leased,
"Succeeded" => JobState::Succeeded,
"Failed" => JobState::Failed,
"Cancelled" => JobState::Cancelled,
_ => return None,
})
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
JobState::Succeeded | JobState::Failed | JobState::Cancelled
)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Outcome {
Succeeded,
Failed { error_message: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NewJob {
pub tenant_id: TenantId,
pub kind: String,
pub payload: serde_json::Value,
pub priority: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRecord {
pub id: JobId,
pub tenant_id: TenantId,
pub kind: String,
pub payload: serde_json::Value,
pub state: JobState,
pub priority: u8,
pub created_at: SystemTime,
pub leased_by: Option<WorkerId>,
pub leased_at: Option<SystemTime>,
pub lease_expires_at: Option<SystemTime>,
pub completed_at: Option<SystemTime>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Lease {
pub lease_id: LeaseId,
pub job_id: JobId,
pub worker_id: WorkerId,
pub acquired_at: SystemTime,
pub expires_at: SystemTime,
pub job: JobRecord,
}
#[derive(Debug, Clone, PartialEq, Error)]
pub enum JobError {
#[error("job {id} not found")]
NotFound { id: JobId },
#[error("job {id} is already leased; expires at {expires_at:?}")]
AlreadyLeased { id: JobId, expires_at: SystemTime },
#[error("lease {lease_id} is no longer valid for job {job_id}")]
InvalidLease { job_id: JobId, lease_id: LeaseId },
#[error("job {id} is in terminal state {state:?}; cannot transition to {attempted:?}")]
TerminalState {
id: JobId,
state: JobState,
attempted: JobState,
},
#[error("storage failure: {message}")]
StorageFailure { message: String },
}
impl JobError {
pub fn metric_label(&self) -> &'static str {
match self {
JobError::NotFound { .. } => "not_found",
JobError::AlreadyLeased { .. } => "already_leased",
JobError::InvalidLease { .. } => "invalid_lease",
JobError::TerminalState { .. } => "terminal_state",
JobError::StorageFailure { .. } => "storage_failure",
}
}
}
#[async_trait]
pub trait JobQueue: Send + Sync {
async fn enqueue(&self, job: NewJob) -> Result<JobId, JobError>;
async fn acquire_lease(
&self,
worker_id: WorkerId,
kinds_filter: Option<Vec<String>>,
tenant_filter: Option<TenantId>,
lease_ttl_secs: u64,
) -> Result<Option<Lease>, JobError>;
async fn heartbeat(
&self,
lease_id: LeaseId,
job_id: JobId,
lease_ttl_secs: u64,
) -> Result<(), JobError>;
async fn complete(
&self,
lease_id: LeaseId,
job_id: JobId,
outcome: Outcome,
) -> Result<(), JobError>;
async fn cancel(&self, job_id: JobId) -> Result<(), JobError>;
async fn get(&self, job_id: JobId) -> Result<JobRecord, JobError>;
async fn list(
&self,
tenant_filter: Option<TenantId>,
state_filter: Option<JobState>,
limit: usize,
) -> Result<Vec<JobRecord>, JobError>;
async fn expire_stale_leases(&self) -> Result<usize, JobError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn job_state_strings_are_stable_and_round_trip() {
let states = [
JobState::Pending,
JobState::Leased,
JobState::Succeeded,
JobState::Failed,
JobState::Cancelled,
];
for state in &states {
let s = state.as_str();
let back = JobState::from_str(s).expect("round-trip");
assert_eq!(*state, back);
}
assert_eq!(JobState::Pending.as_str(), "Pending");
assert_eq!(JobState::Leased.as_str(), "Leased");
assert_eq!(JobState::Failed.as_str(), "Failed");
}
#[test]
fn unknown_state_string_returns_none() {
assert!(JobState::from_str("UnknownState").is_none());
assert!(JobState::from_str("").is_none());
}
#[test]
fn terminal_states_are_classified_correctly() {
assert!(!JobState::Pending.is_terminal());
assert!(!JobState::Leased.is_terminal());
assert!(JobState::Succeeded.is_terminal());
assert!(JobState::Failed.is_terminal());
assert!(JobState::Cancelled.is_terminal());
}
#[test]
fn job_error_metric_labels_are_stable() {
let job_id = JobId::new_random();
let lease_id = LeaseId::new_random();
assert_eq!(
JobError::NotFound { id: job_id }.metric_label(),
"not_found"
);
assert_eq!(
JobError::AlreadyLeased {
id: job_id,
expires_at: SystemTime::UNIX_EPOCH,
}
.metric_label(),
"already_leased"
);
assert_eq!(
JobError::InvalidLease { job_id, lease_id }.metric_label(),
"invalid_lease"
);
assert_eq!(
JobError::TerminalState {
id: job_id,
state: JobState::Succeeded,
attempted: JobState::Pending,
}
.metric_label(),
"terminal_state"
);
assert_eq!(
JobError::StorageFailure {
message: "x".into(),
}
.metric_label(),
"storage_failure"
);
}
}