use crate::{Error, Job, JobPayload, Queue, QueueConfig};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::OnceLock;
use std::time::Duration;
static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();
pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
let _ = TENANT_ID_HOOK.set(f);
}
pub struct PendingDispatch<J> {
job: J,
queue: Option<&'static str>,
delay: Option<Duration>,
tenant_id: Option<i64>,
}
impl<J> PendingDispatch<J>
where
J: Job + Serialize + DeserializeOwned,
{
pub fn new(job: J) -> Self {
Self {
job,
queue: None,
delay: None,
tenant_id: None,
}
}
pub fn on_queue(mut self, queue: &'static str) -> Self {
self.queue = Some(queue);
self
}
pub fn delay(mut self, duration: Duration) -> Self {
self.delay = Some(duration);
self
}
pub fn for_tenant(mut self, tenant_id: i64) -> Self {
self.tenant_id = Some(tenant_id);
self
}
fn captured_tenant_id(&self) -> Option<i64> {
self.tenant_id
.or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
}
pub async fn dispatch(self) -> Result<(), Error> {
if QueueConfig::is_sync_mode() {
return self.dispatch_immediately().await;
}
self.dispatch_to_queue().await
}
async fn dispatch_immediately(self) -> Result<(), Error> {
let job_name = self.job.name();
if self.delay.is_some() {
tracing::debug!(
job = %job_name,
"Job delay ignored in sync mode"
);
}
if self.tenant_id.is_some() {
tracing::debug!(
job = %job_name,
tenant_id = ?self.tenant_id,
"for_tenant() ignored in sync mode — current task tenant context applies"
);
}
tracing::debug!(job = %job_name, "Executing job synchronously");
match self.job.handle().await {
Ok(()) => {
tracing::debug!(job = %job_name, "Job completed successfully");
Ok(())
}
Err(e) => {
tracing::error!(job = %job_name, error = %e, "Job failed");
self.job.failed(&e).await;
Err(e)
}
}
}
async fn dispatch_to_queue(self) -> Result<(), Error> {
let conn = Queue::connection();
let queue = self.queue.unwrap_or(&conn.config().default_queue);
let tenant_id = self.captured_tenant_id();
let payload = match self.delay {
Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
None => JobPayload::new(&self.job, queue)?,
};
let payload = payload.with_tenant_id(tenant_id);
conn.push(payload).await
}
pub fn dispatch_now(self)
where
J: Send + 'static,
{
tokio::spawn(async move {
if let Err(e) = self.dispatch().await {
tracing::error!(error = %e, "Failed to dispatch job");
}
});
}
#[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
pub fn dispatch_sync(self)
where
J: Send + 'static,
{
self.dispatch_now()
}
}
pub async fn dispatch<J>(job: J) -> Result<(), Error>
where
J: Job + Serialize + DeserializeOwned,
{
PendingDispatch::new(job).dispatch().await
}
pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
where
J: Job + Serialize + DeserializeOwned,
{
PendingDispatch::new(job).on_queue(queue).dispatch().await
}
pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
where
J: Job + Serialize + DeserializeOwned,
{
PendingDispatch::new(job).delay(delay).dispatch().await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::async_trait;
use serial_test::serial;
use std::env;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct EnvGuard {
vars: Vec<String>,
}
impl EnvGuard {
fn set(key: &str, value: &str) -> Self {
env::set_var(key, value);
Self {
vars: vec![key.to_string()],
}
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for var in &self.vars {
env::remove_var(var);
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct TestJob {
#[serde(skip)]
executed: Arc<AtomicBool>,
}
impl TestJob {
fn new() -> (Self, Arc<AtomicBool>) {
let executed = Arc::new(AtomicBool::new(false));
(
Self {
executed: executed.clone(),
},
executed,
)
}
}
#[async_trait]
impl Job for TestJob {
async fn handle(&self) -> Result<(), Error> {
self.executed.store(true, Ordering::SeqCst);
Ok(())
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FailingJob;
#[async_trait]
impl Job for FailingJob {
async fn handle(&self) -> Result<(), Error> {
Err(Error::job_failed("FailingJob", "intentional failure"))
}
}
#[tokio::test]
#[serial]
async fn test_sync_mode_executes_immediately() {
let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
let (job, executed) = TestJob::new();
assert!(!executed.load(Ordering::SeqCst));
let result = PendingDispatch::new(job).dispatch().await;
assert!(result.is_ok());
assert!(executed.load(Ordering::SeqCst));
}
#[tokio::test]
#[serial]
async fn test_sync_mode_handles_failure() {
let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
let result = PendingDispatch::new(FailingJob).dispatch().await;
assert!(result.is_err());
}
#[tokio::test]
#[serial]
async fn test_sync_mode_ignores_delay() {
let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
let (job, executed) = TestJob::new();
let start = std::time::Instant::now();
let result = PendingDispatch::new(job)
.delay(Duration::from_secs(10))
.dispatch()
.await;
assert!(result.is_ok());
assert!(executed.load(Ordering::SeqCst));
assert!(start.elapsed() < Duration::from_secs(5));
}
#[tokio::test]
#[serial]
async fn test_sync_mode_ignores_queue() {
let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
let (job, executed) = TestJob::new();
let result = PendingDispatch::new(job)
.on_queue("high-priority")
.dispatch()
.await;
assert!(result.is_ok());
assert!(executed.load(Ordering::SeqCst));
}
#[test]
fn test_for_tenant_stores_explicit_override() {
let (job, _) = TestJob::new();
let pending = PendingDispatch::new(job).for_tenant(99);
assert_eq!(pending.tenant_id, Some(99));
}
#[test]
fn test_for_tenant_explicit_wins_over_hook() {
let (job, _) = TestJob::new();
let pending = PendingDispatch::new(job).for_tenant(99);
assert_eq!(pending.captured_tenant_id(), Some(99));
}
#[test]
fn test_no_tenant_id_by_default() {
let (job, _) = TestJob::new();
let pending = PendingDispatch::new(job);
assert_eq!(pending.tenant_id, None);
}
#[test]
fn test_hook_registration_second_call_is_noop() {
register_tenant_capture_hook(|| Some(42));
register_tenant_capture_hook(|| Some(999)); let result = TENANT_ID_HOOK.get().map(|f| f());
let _ = result;
}
#[test]
fn test_hook_registration_captures_at_dispatch_time() {
register_tenant_capture_hook(|| Some(42));
let (job, _) = TestJob::new();
let pending = PendingDispatch::new(job);
let captured = pending.captured_tenant_id();
assert!(captured.is_none() || captured == Some(42));
}
}