Skip to main content

ferro_queue/
dispatcher.rs

1//! Job dispatching utilities.
2
3use crate::{Error, Job, JobPayload, Queue, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::sync::OnceLock;
6use std::time::Duration;
7
8/// Global hook called at dispatch time to capture the current tenant ID.
9/// Returns None when dispatching outside any tenant scope (system jobs).
10/// Registered once during application bootstrap by the framework.
11static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();
12
13/// Register the tenant capture hook.
14///
15/// Called once during application bootstrap. Re-registration is silently ignored.
16/// The hook is invoked at dispatch time to capture the current tenant ID from
17/// task-local storage without requiring a direct dependency on the framework crate.
18pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
19    let _ = TENANT_ID_HOOK.set(f);
20}
21
22/// A pending job dispatch.
23///
24/// This builder allows configuring how a job is dispatched before
25/// actually sending it to the queue.
26pub struct PendingDispatch<J> {
27    job: J,
28    queue: Option<&'static str>,
29    delay: Option<Duration>,
30    /// Explicit tenant ID override. When set, takes precedence over the auto-capture hook.
31    tenant_id: Option<i64>,
32}
33
34impl<J> PendingDispatch<J>
35where
36    J: Job + Serialize + DeserializeOwned,
37{
38    /// Create a new pending dispatch.
39    pub fn new(job: J) -> Self {
40        Self {
41            job,
42            queue: None,
43            delay: None,
44            tenant_id: None,
45        }
46    }
47
48    /// Set the queue to dispatch to.
49    pub fn on_queue(mut self, queue: &'static str) -> Self {
50        self.queue = Some(queue);
51        self
52    }
53
54    /// Set a delay before the job is available for processing.
55    pub fn delay(mut self, duration: Duration) -> Self {
56        self.delay = Some(duration);
57        self
58    }
59
60    /// Override the auto-captured tenant ID.
61    ///
62    /// Use when dispatching jobs on behalf of a tenant from a non-tenant-scoped context
63    /// (e.g., admin actions, CLI commands, system webhooks).
64    /// This explicit value takes precedence over the auto-capture hook.
65    pub fn for_tenant(mut self, tenant_id: i64) -> Self {
66        self.tenant_id = Some(tenant_id);
67        self
68    }
69
70    /// Resolve the tenant ID to attach to the job payload.
71    ///
72    /// Precedence: explicit override (for_tenant) > auto-capture hook > None.
73    fn captured_tenant_id(&self) -> Option<i64> {
74        self.tenant_id
75            .or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
76    }
77
78    /// Dispatch the job to the queue.
79    ///
80    /// In sync mode (`QUEUE_CONNECTION=sync`), the job is executed immediately
81    /// in the current task. This is useful for development and testing.
82    ///
83    /// In redis mode (`QUEUE_CONNECTION=redis`), the job is pushed to the
84    /// Redis queue for background processing by a worker.
85    pub async fn dispatch(self) -> Result<(), Error> {
86        if QueueConfig::is_sync_mode() {
87            return self.dispatch_immediately().await;
88        }
89
90        self.dispatch_to_queue().await
91    }
92
93    /// Execute the job immediately (sync mode).
94    ///
95    /// Note: `.for_tenant()` is a no-op in sync mode. The current task's tenant context
96    /// applies directly since the job runs in the same task.
97    async fn dispatch_immediately(self) -> Result<(), Error> {
98        let job_name = self.job.name();
99
100        if self.delay.is_some() {
101            tracing::debug!(
102                job = %job_name,
103                "Job delay ignored in sync mode"
104            );
105        }
106
107        if self.tenant_id.is_some() {
108            tracing::debug!(
109                job = %job_name,
110                tenant_id = ?self.tenant_id,
111                "for_tenant() ignored in sync mode — current task tenant context applies"
112            );
113        }
114
115        tracing::debug!(job = %job_name, "Executing job synchronously");
116
117        match self.job.handle().await {
118            Ok(()) => {
119                tracing::debug!(job = %job_name, "Job completed successfully");
120                Ok(())
121            }
122            Err(e) => {
123                tracing::error!(job = %job_name, error = %e, "Job failed");
124                self.job.failed(&e).await;
125                Err(e)
126            }
127        }
128    }
129
130    /// Push the job to the Redis queue.
131    async fn dispatch_to_queue(self) -> Result<(), Error> {
132        let conn = Queue::connection();
133        let queue = self.queue.unwrap_or(&conn.config().default_queue);
134        let tenant_id = self.captured_tenant_id();
135
136        let payload = match self.delay {
137            Some(delay) => JobPayload::with_delay(&self.job, queue, delay)?,
138            None => JobPayload::new(&self.job, queue)?,
139        };
140
141        let payload = payload.with_tenant_id(tenant_id);
142
143        conn.push(payload).await
144    }
145
146    /// Dispatch the job in a background task (fire and forget).
147    ///
148    /// This spawns the dispatch as a background task, useful when you
149    /// don't need to wait for the dispatch to complete.
150    pub fn dispatch_now(self)
151    where
152        J: Send + 'static,
153    {
154        tokio::spawn(async move {
155            if let Err(e) = self.dispatch().await {
156                tracing::error!(error = %e, "Failed to dispatch job");
157            }
158        });
159    }
160
161    /// Dispatch the job synchronously (fire and forget).
162    ///
163    /// This spawns the dispatch as a background task.
164    #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
165    pub fn dispatch_sync(self)
166    where
167        J: Send + 'static,
168    {
169        self.dispatch_now()
170    }
171}
172
173/// Dispatch a job using the global queue.
174///
175/// In sync mode, the job executes immediately. In redis mode, it's
176/// queued for background processing.
177///
178/// # Example
179///
180/// ```rust,ignore
181/// use ferro_queue::{dispatch, Job, Error};
182///
183/// #[derive(Debug, Serialize, Deserialize)]
184/// struct MyJob { data: String }
185///
186/// impl Job for MyJob {
187///     async fn handle(&self) -> Result<(), Error> { Ok(()) }
188/// }
189///
190/// dispatch(MyJob { data: "hello".into() }).await?;
191/// ```
192pub async fn dispatch<J>(job: J) -> Result<(), Error>
193where
194    J: Job + Serialize + DeserializeOwned,
195{
196    PendingDispatch::new(job).dispatch().await
197}
198
199/// Dispatch a job to a specific queue.
200pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
201where
202    J: Job + Serialize + DeserializeOwned,
203{
204    PendingDispatch::new(job).on_queue(queue).dispatch().await
205}
206
207/// Dispatch a job with a delay.
208///
209/// Note: In sync mode, the delay is ignored and the job executes immediately.
210pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
211where
212    J: Job + Serialize + DeserializeOwned,
213{
214    PendingDispatch::new(job).delay(delay).dispatch().await
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use crate::async_trait;
221    use serial_test::serial;
222    use std::env;
223    use std::sync::atomic::{AtomicBool, Ordering};
224    use std::sync::Arc;
225
226    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
227    struct EnvGuard {
228        vars: Vec<String>,
229    }
230
231    impl EnvGuard {
232        fn set(key: &str, value: &str) -> Self {
233            env::set_var(key, value);
234            Self {
235                vars: vec![key.to_string()],
236            }
237        }
238    }
239
240    impl Drop for EnvGuard {
241        fn drop(&mut self) {
242            for var in &self.vars {
243                env::remove_var(var);
244            }
245        }
246    }
247
248    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
249    struct TestJob {
250        #[serde(skip)]
251        executed: Arc<AtomicBool>,
252    }
253
254    impl TestJob {
255        fn new() -> (Self, Arc<AtomicBool>) {
256            let executed = Arc::new(AtomicBool::new(false));
257            (
258                Self {
259                    executed: executed.clone(),
260                },
261                executed,
262            )
263        }
264    }
265
266    #[async_trait]
267    impl Job for TestJob {
268        async fn handle(&self) -> Result<(), Error> {
269            self.executed.store(true, Ordering::SeqCst);
270            Ok(())
271        }
272    }
273
274    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
275    struct FailingJob;
276
277    #[async_trait]
278    impl Job for FailingJob {
279        async fn handle(&self) -> Result<(), Error> {
280            Err(Error::job_failed("FailingJob", "intentional failure"))
281        }
282    }
283
284    #[tokio::test]
285    #[serial]
286    async fn test_sync_mode_executes_immediately() {
287        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
288
289        let (job, executed) = TestJob::new();
290        assert!(!executed.load(Ordering::SeqCst));
291
292        let result = PendingDispatch::new(job).dispatch().await;
293        assert!(result.is_ok());
294        assert!(executed.load(Ordering::SeqCst));
295    }
296
297    #[tokio::test]
298    #[serial]
299    async fn test_sync_mode_handles_failure() {
300        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
301
302        let result = PendingDispatch::new(FailingJob).dispatch().await;
303        assert!(result.is_err());
304    }
305
306    #[tokio::test]
307    #[serial]
308    async fn test_sync_mode_ignores_delay() {
309        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
310
311        let (job, executed) = TestJob::new();
312
313        let start = std::time::Instant::now();
314        let result = PendingDispatch::new(job)
315            .delay(Duration::from_secs(10))
316            .dispatch()
317            .await;
318
319        assert!(result.is_ok());
320        assert!(executed.load(Ordering::SeqCst));
321        // Should complete quickly, not wait 10 seconds
322        assert!(start.elapsed() < Duration::from_secs(5));
323    }
324
325    #[tokio::test]
326    #[serial]
327    async fn test_sync_mode_ignores_queue() {
328        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
329
330        let (job, executed) = TestJob::new();
331
332        let result = PendingDispatch::new(job)
333            .on_queue("high-priority")
334            .dispatch()
335            .await;
336
337        assert!(result.is_ok());
338        assert!(executed.load(Ordering::SeqCst));
339    }
340
341    // --- Task 2 tests ---
342
343    #[test]
344    fn test_for_tenant_stores_explicit_override() {
345        let (job, _) = TestJob::new();
346        let pending = PendingDispatch::new(job).for_tenant(99);
347        assert_eq!(pending.tenant_id, Some(99));
348    }
349
350    #[test]
351    fn test_for_tenant_explicit_wins_over_hook() {
352        // Even if a hook is registered, explicit for_tenant() takes precedence.
353        // The hook may return Some(42) from test_hook_registered_once (since OnceLock),
354        // but for_tenant(99) overrides it via captured_tenant_id() precedence logic.
355        let (job, _) = TestJob::new();
356        let pending = PendingDispatch::new(job).for_tenant(99);
357        // captured_tenant_id() returns explicit override first
358        assert_eq!(pending.captured_tenant_id(), Some(99));
359    }
360
361    #[test]
362    fn test_no_tenant_id_by_default() {
363        let (job, _) = TestJob::new();
364        let pending = PendingDispatch::new(job);
365        assert_eq!(pending.tenant_id, None);
366    }
367
368    #[test]
369    fn test_hook_registration_second_call_is_noop() {
370        // OnceLock ignores the second set() — first registration wins.
371        // We register once and verify calling register_tenant_capture_hook again doesn't panic.
372        register_tenant_capture_hook(|| Some(42));
373        register_tenant_capture_hook(|| Some(999)); // silently ignored
374                                                    // If the hook was set to 42 first, it remains 42
375        let result = TENANT_ID_HOOK.get().map(|f| f());
376        // We can't guarantee the value here due to test ordering, but it must not be 999
377        // since OnceLock only accepts the first write.
378        // Just assert no panic occurred.
379        let _ = result;
380    }
381
382    #[test]
383    fn test_hook_registration_captures_at_dispatch_time() {
384        // Register hook returning Some(42) — subsequent calls to captured_tenant_id()
385        // without an explicit override will return Some(42) from the hook.
386        register_tenant_capture_hook(|| Some(42));
387        // With no explicit for_tenant(), hook is consulted
388        let (job, _) = TestJob::new();
389        let pending = PendingDispatch::new(job);
390        // captured_tenant_id() consults hook when no explicit override set
391        let captured = pending.captured_tenant_id();
392        // Could be Some(42) if hook registered, or None if hook was already set by prior test
393        // We only assert no panic and that the explicit override path still works.
394        assert!(captured.is_none() || captured == Some(42));
395    }
396}