use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use chrono::{DateTime, Utc};
use forge_core::function::JobDispatch;
use forge_core::job::{ForgeJob, JobInfo, JobPriority};
use uuid::Uuid;
use super::queue::{JobQueue, JobRecord};
use super::registry::JobRegistry;
#[derive(Clone)]
pub struct JobDispatcher {
queue: JobQueue,
registry: JobRegistry,
}
impl JobDispatcher {
pub fn new(queue: JobQueue, registry: JobRegistry) -> Self {
Self { queue, registry }
}
pub async fn dispatch<J: ForgeJob>(
&self,
args: J::Args,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError>
where
J::Args: serde::Serialize,
{
let info = J::info();
self.dispatch_with_info(&info, serde_json::to_value(args)?, owner_subject)
.await
}
pub async fn dispatch_in<J: ForgeJob>(
&self,
delay: Duration,
args: J::Args,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError>
where
J::Args: serde::Serialize,
{
let info = J::info();
let scheduled_at = Utc::now()
+ chrono::Duration::from_std(delay)
.map_err(|_| forge_core::ForgeError::InvalidArgument("delay too large".into()))?;
self.dispatch_at_with_info(
&info,
serde_json::to_value(args)?,
scheduled_at,
owner_subject,
)
.await
}
pub async fn dispatch_at<J: ForgeJob>(
&self,
at: DateTime<Utc>,
args: J::Args,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError>
where
J::Args: serde::Serialize,
{
let info = J::info();
self.dispatch_at_with_info(&info, serde_json::to_value(args)?, at, owner_subject)
.await
}
pub async fn dispatch_by_name(
&self,
job_type: &str,
args: serde_json::Value,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError> {
let entry = self.registry.get(job_type).ok_or_else(|| {
forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
self.dispatch_with_info(&entry.info, args, owner_subject)
.await
}
async fn dispatch_with_info(
&self,
info: &JobInfo,
args: serde_json::Value,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError> {
let mut job = JobRecord::new(
info.name,
args,
info.priority,
info.retry.max_attempts as i32,
)
.with_owner_subject(owner_subject);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
pub async fn cancel(
&self,
job_id: Uuid,
reason: Option<&str>,
caller_subject: Option<&str>,
) -> Result<bool, forge_core::ForgeError> {
self.queue
.request_cancel(job_id, reason, caller_subject)
.await
.map_err(forge_core::ForgeError::Database)
}
async fn dispatch_at_with_info(
&self,
info: &JobInfo,
args: serde_json::Value,
scheduled_at: DateTime<Utc>,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError> {
let mut job = JobRecord::new(
info.name,
args,
info.priority,
info.retry.max_attempts as i32,
)
.with_scheduled_at(scheduled_at)
.with_owner_subject(owner_subject);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
async fn dispatch_with_info_and_tenant(
&self,
info: &JobInfo,
args: serde_json::Value,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Result<Uuid, forge_core::ForgeError> {
let mut job = JobRecord::new(
info.name,
args,
info.priority,
info.retry.max_attempts as i32,
)
.with_owner_subject(owner_subject)
.with_tenant_id(tenant_id);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
async fn dispatch_at_with_info_and_tenant(
&self,
info: &JobInfo,
args: serde_json::Value,
scheduled_at: DateTime<Utc>,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Result<Uuid, forge_core::ForgeError> {
let mut job = JobRecord::new(
info.name,
args,
info.priority,
info.retry.max_attempts as i32,
)
.with_scheduled_at(scheduled_at)
.with_owner_subject(owner_subject)
.with_tenant_id(tenant_id);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
pub async fn dispatch_idempotent<J: ForgeJob>(
&self,
idempotency_key: impl Into<String>,
args: J::Args,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError>
where
J::Args: serde::Serialize,
{
let info = J::info();
let mut job = JobRecord::new(
info.name,
serde_json::to_value(args)?,
info.priority,
info.retry.max_attempts as i32,
)
.with_idempotency_key(idempotency_key)
.with_owner_subject(owner_subject);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
pub async fn dispatch_with_priority<J: ForgeJob>(
&self,
priority: JobPriority,
args: J::Args,
owner_subject: Option<String>,
) -> Result<Uuid, forge_core::ForgeError>
where
J::Args: serde::Serialize,
{
let info = J::info();
let mut job = JobRecord::new(
info.name,
serde_json::to_value(args)?,
priority,
info.retry.max_attempts as i32,
)
.with_owner_subject(owner_subject);
if let Some(cap) = info.worker_capability {
job = job.with_capability(cap);
}
self.queue
.enqueue(job)
.await
.map_err(forge_core::ForgeError::Database)
}
}
impl JobDispatch for JobDispatcher {
fn get_info(&self, job_type: &str) -> Option<JobInfo> {
self.registry.get(job_type).map(|e| e.info.clone())
}
fn dispatch_by_name(
&self,
job_type: &str,
args: serde_json::Value,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
let job_type = job_type.to_string();
Box::pin(async move {
let entry = self.registry.get(&job_type).ok_or_else(|| {
forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
self.dispatch_with_info_and_tenant(&entry.info, args, owner_subject, tenant_id)
.await
})
}
fn dispatch_by_name_at(
&self,
job_type: &str,
args: serde_json::Value,
scheduled_at: DateTime<Utc>,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + '_>> {
let job_type = job_type.to_string();
Box::pin(async move {
let entry = self.registry.get(&job_type).ok_or_else(|| {
forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
self.dispatch_at_with_info_and_tenant(
&entry.info,
args,
scheduled_at,
owner_subject,
tenant_id,
)
.await
})
}
fn dispatch_in_conn<'a>(
&'a self,
conn: &'a mut sqlx::PgConnection,
job_type: &'a str,
args: serde_json::Value,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + 'a>> {
Box::pin(async move {
let entry = self.registry.get(job_type).ok_or_else(|| {
forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
let mut record = JobRecord::new(
entry.info.name,
args,
entry.info.priority,
entry.info.retry.max_attempts as i32,
)
.with_owner_subject(owner_subject)
.with_tenant_id(tenant_id);
if let Some(cap) = entry.info.worker_capability {
record = record.with_capability(cap);
}
self.queue
.enqueue_in_conn(conn, record)
.await
.map_err(forge_core::ForgeError::Database)
})
}
fn dispatch_in_conn_at<'a>(
&'a self,
conn: &'a mut sqlx::PgConnection,
job_type: &'a str,
args: serde_json::Value,
scheduled_at: DateTime<Utc>,
owner_subject: Option<String>,
tenant_id: Option<Uuid>,
) -> Pin<Box<dyn Future<Output = forge_core::Result<Uuid>> + Send + 'a>> {
Box::pin(async move {
let entry = self.registry.get(job_type).ok_or_else(|| {
forge_core::ForgeError::NotFound(format!("Job type '{}' not found", job_type))
})?;
let mut record = JobRecord::new(
entry.info.name,
args,
entry.info.priority,
entry.info.retry.max_attempts as i32,
)
.with_scheduled_at(scheduled_at)
.with_owner_subject(owner_subject)
.with_tenant_id(tenant_id);
if let Some(cap) = entry.info.worker_capability {
record = record.with_capability(cap);
}
self.queue
.enqueue_in_conn(conn, record)
.await
.map_err(forge_core::ForgeError::Database)
})
}
fn cancel(
&self,
job_id: Uuid,
reason: Option<String>,
) -> Pin<Box<dyn Future<Output = forge_core::Result<bool>> + Send + '_>> {
Box::pin(async move { self.cancel(job_id, reason.as_deref(), None).await })
}
}
#[cfg(all(test, feature = "testcontainers"))]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::panic,
clippy::disallowed_methods
)]
mod integration_tests {
use super::*;
use crate::jobs::registry::BoxedJobHandler;
use forge_core::testing::{IsolatedTestDb, TestDatabase};
use std::sync::Arc;
async fn setup_db(test_name: &str) -> IsolatedTestDb {
let base = TestDatabase::from_env()
.await
.expect("Failed to create test database");
let db = base
.isolated(test_name)
.await
.expect("Failed to create isolated db");
let system_sql = crate::pg::migration::get_all_system_sql();
db.run_sql(&system_sql)
.await
.expect("Failed to apply system schema");
db
}
fn noop_handler() -> BoxedJobHandler {
Arc::new(|_ctx, _args| Box::pin(async { Ok(serde_json::Value::Null) }))
}
fn dispatcher_with_registry(
pool: sqlx::PgPool,
seed: impl FnOnce(&mut JobRegistry),
) -> JobDispatcher {
let queue = JobQueue::new(pool);
let mut registry = JobRegistry::new();
seed(&mut registry);
JobDispatcher::new(queue, registry)
}
fn info_with(name: &'static str, capability: Option<&'static str>) -> JobInfo {
JobInfo {
name,
worker_capability: capability,
..Default::default()
}
}
#[tokio::test]
async fn dispatch_by_name_returns_not_found_for_unregistered_job() {
let db = setup_db("dispatch_unknown").await;
let dispatcher = dispatcher_with_registry(db.pool().clone(), |_| {});
let err = dispatcher
.dispatch_by_name("ghost", serde_json::json!({}), None)
.await
.expect_err("unknown job must error");
match err {
forge_core::ForgeError::NotFound(msg) => {
assert!(msg.contains("ghost"), "error must name the missing job");
}
other => panic!("expected NotFound, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_by_name_enqueues_with_registered_metadata() {
let db = setup_db("dispatch_capability").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |reg| {
reg.register_system("ship", info_with("ship", Some("media")), noop_handler());
});
let job_id = dispatcher
.dispatch_by_name(
"ship",
serde_json::json!({"to": "warehouse"}),
Some("u-1".into()),
)
.await
.unwrap();
let row: (String, Option<String>, Option<String>, serde_json::Value) = sqlx::query_as(
"SELECT job_type, worker_capability, owner_subject, input
FROM forge_jobs WHERE id = $1",
)
.bind(job_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "ship");
assert_eq!(row.1.as_deref(), Some("media"));
assert_eq!(row.2.as_deref(), Some("u-1"));
assert_eq!(row.3, serde_json::json!({"to": "warehouse"}));
}
#[tokio::test]
async fn dispatch_in_conn_only_commits_with_surrounding_tx() {
let db = setup_db("dispatch_in_conn_rollback").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |reg| {
reg.register_system("ship", info_with("ship", None), noop_handler());
});
let mut tx = pool.begin().await.unwrap();
let id = JobDispatch::dispatch_in_conn(
&dispatcher,
&mut tx,
"ship",
serde_json::json!({}),
None,
None,
)
.await
.unwrap();
tx.rollback().await.unwrap();
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM forge_jobs WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 0, "rolled-back tx must not leave job behind");
}
#[tokio::test]
async fn dispatch_in_conn_commits_with_surrounding_tx() {
let db = setup_db("dispatch_in_conn_commit").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |reg| {
reg.register_system("ship", info_with("ship", None), noop_handler());
});
let mut tx = pool.begin().await.unwrap();
let id = JobDispatch::dispatch_in_conn(
&dispatcher,
&mut tx,
"ship",
serde_json::json!({}),
None,
None,
)
.await
.unwrap();
tx.commit().await.unwrap();
let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM forge_jobs WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 1);
}
#[tokio::test]
async fn dispatch_in_conn_returns_not_found_for_unregistered_job() {
let db = setup_db("dispatch_in_conn_unknown").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |_| {});
let mut tx = pool.begin().await.unwrap();
let err = JobDispatch::dispatch_in_conn(
&dispatcher,
&mut tx,
"missing",
serde_json::json!({}),
None,
None,
)
.await
.expect_err("unknown job must error");
tx.rollback().await.unwrap();
assert!(matches!(err, forge_core::ForgeError::NotFound(_)));
}
#[tokio::test]
async fn get_info_returns_registered_info_or_none() {
let db = setup_db("dispatcher_get_info").await;
let dispatcher = dispatcher_with_registry(db.pool().clone(), |reg| {
reg.register_system("ship", info_with("ship", Some("media")), noop_handler());
});
let info = JobDispatch::get_info(&dispatcher, "ship").expect("registered");
assert_eq!(info.name, "ship");
assert_eq!(info.worker_capability, Some("media"));
assert!(JobDispatch::get_info(&dispatcher, "absent").is_none());
}
#[tokio::test]
async fn cancel_returns_false_for_unknown_job() {
let db = setup_db("dispatcher_cancel_missing").await;
let dispatcher = dispatcher_with_registry(db.pool().clone(), |_| {});
let ok = dispatcher
.cancel(Uuid::new_v4(), Some("test"), None)
.await
.unwrap();
assert!(!ok);
}
#[tokio::test]
async fn cancel_marks_owned_job_when_caller_matches() {
let db = setup_db("dispatcher_cancel_owner_ok").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |reg| {
reg.register_system("ship", info_with("ship", None), noop_handler());
});
let job_id = dispatcher
.dispatch_by_name("ship", serde_json::json!({}), Some("alice".into()))
.await
.unwrap();
let ok = dispatcher
.cancel(job_id, Some("user requested"), Some("alice"))
.await
.unwrap();
assert!(ok, "owner must be able to cancel their own job");
}
#[tokio::test]
async fn cancel_rejects_owned_job_when_caller_differs() {
let db = setup_db("dispatcher_cancel_owner_mismatch").await;
let pool = db.pool().clone();
let dispatcher = dispatcher_with_registry(pool.clone(), |reg| {
reg.register_system("ship", info_with("ship", None), noop_handler());
});
let job_id = dispatcher
.dispatch_by_name("ship", serde_json::json!({}), Some("alice".into()))
.await
.unwrap();
let ok = dispatcher
.cancel(job_id, Some("malicious"), Some("mallory"))
.await
.unwrap();
assert!(!ok, "non-owner must not be able to cancel");
}
}