pub mod create_rapina_jobs;
mod model;
pub(crate) mod retry;
pub(crate) mod worker;
pub use model::{JobRow, JobStatus};
pub use retry::RetryPolicy;
pub use worker::JobConfig;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use sea_orm::{ConnectionTrait, DatabaseConnection, DbBackend, Statement};
use uuid::Uuid;
use crate::state::AppState;
pub type JobId = Uuid;
pub type JobResult = Result<(), crate::error::Error>;
pub struct JobRequest {
pub job_type: &'static str,
pub payload: serde_json::Value,
pub queue: &'static str,
pub max_retries: i32,
}
#[doc(hidden)]
pub type JobHandlerFn =
fn(serde_json::Value, Arc<AppState>) -> Pin<Box<dyn Future<Output = JobResult> + Send>>;
pub struct JobDescriptor {
pub job_type: &'static str,
#[doc(hidden)]
pub handle: JobHandlerFn,
#[doc(hidden)]
pub retry_policy: &'static str,
#[doc(hidden)]
pub retry_delay_secs: f64,
}
inventory::collect!(JobDescriptor);
#[derive(Debug, Clone)]
pub struct Jobs {
pool: DatabaseConnection,
pub(crate) trace_id: Option<String>,
}
impl Jobs {
pub fn new(pool: DatabaseConnection, trace_id: Option<String>) -> Self {
Self { pool, trace_id }
}
pub async fn enqueue(&self, req: impl Into<JobRequest>) -> crate::error::Result<JobId> {
insert_job(&self.pool, req.into(), self.trace_id.as_deref()).await
}
pub async fn enqueue_with<C>(
&self,
conn: &C,
req: impl Into<JobRequest>,
) -> crate::error::Result<JobId>
where
C: ConnectionTrait,
{
insert_job(conn, req.into(), self.trace_id.as_deref()).await
}
}
async fn insert_job<C>(
conn: &C,
req: JobRequest,
trace_id: Option<&str>,
) -> crate::error::Result<JobId>
where
C: ConnectionTrait,
{
debug_assert_eq!(
conn.get_database_backend(),
DbBackend::Postgres,
"Jobs require PostgreSQL — rapina_jobs uses gen_random_uuid() and partial indexes"
);
let stmt = build_insert_stmt(req, trace_id);
let row = conn
.query_one(stmt)
.await
.map_err(|e| crate::error::Error::internal(format!("failed to enqueue job: {e}")))?
.ok_or_else(|| crate::error::Error::internal("INSERT INTO rapina_jobs returned no rows"))?;
let id: Uuid = row
.try_get("", "id")
.map_err(|e| crate::error::Error::internal(format!("failed to read job id: {e}")))?;
Ok(id)
}
fn build_insert_stmt(req: JobRequest, trace_id: Option<&str>) -> Statement {
Statement::from_sql_and_values(
DbBackend::Postgres,
"INSERT INTO rapina_jobs (job_type, queue, payload, max_retries, trace_id) \
VALUES ($1, $2, $3, $4, $5) \
RETURNING id",
[
req.job_type.into(),
req.queue.into(),
req.payload.into(),
req.max_retries.into(),
trace_id.map(ToOwned::to_owned).into(),
],
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn job_request_fields() {
let req = JobRequest {
job_type: "send_email",
payload: serde_json::json!({ "to": "test@example.com" }),
queue: "emails",
max_retries: 5,
};
assert_eq!(req.job_type, "send_email");
assert_eq!(req.queue, "emails");
assert_eq!(req.max_retries, 5);
assert_eq!(req.payload["to"], "test@example.com");
}
#[test]
fn default_convention() {
let req = JobRequest {
job_type: "process_event",
payload: serde_json::Value::Null,
queue: "default",
max_retries: 3,
};
assert_eq!(req.queue, "default");
assert_eq!(req.max_retries, 3);
}
#[test]
fn max_retries_is_i32() {
let req = JobRequest {
job_type: "t",
payload: serde_json::Value::Null,
queue: "default",
max_retries: i32::MAX,
};
assert_eq!(req.max_retries, i32::MAX);
}
#[test]
fn insert_stmt_has_correct_sql() {
let req = JobRequest {
job_type: "send_email",
payload: serde_json::json!({"to": "a@b.com"}),
queue: "emails",
max_retries: 5,
};
let stmt = build_insert_stmt(req, Some("trace-123"));
assert!(stmt.sql.contains("INSERT INTO rapina_jobs"));
assert!(stmt.sql.contains("RETURNING id"));
}
#[test]
fn insert_stmt_uses_postgres_backend() {
let req = JobRequest {
job_type: "t",
payload: serde_json::Value::Null,
queue: "default",
max_retries: 3,
};
let stmt = build_insert_stmt(req, None);
assert_eq!(stmt.db_backend, DbBackend::Postgres);
}
#[test]
fn insert_stmt_trace_id_some() {
let req = JobRequest {
job_type: "t",
payload: serde_json::Value::Null,
queue: "default",
max_retries: 3,
};
let stmt = build_insert_stmt(req, Some("abc-123"));
assert_eq!(stmt.values.as_ref().map(|v| v.0.len()), Some(5));
let trace_val = &stmt.values.as_ref().unwrap().0[4];
assert_eq!(
*trace_val,
sea_orm::Value::String(Some(Box::new("abc-123".to_owned())))
);
}
#[test]
fn insert_stmt_trace_id_none() {
let req = JobRequest {
job_type: "t",
payload: serde_json::Value::Null,
queue: "default",
max_retries: 3,
};
let stmt = build_insert_stmt(req, None);
let trace_val = &stmt.values.as_ref().unwrap().0[4];
assert_eq!(*trace_val, sea_orm::Value::String(None));
}
}