Skip to main content

ferro_queue/
dispatcher.rs

1//! Job dispatching utilities.
2
3use crate::{Error, Job, QueueConfig};
4use serde::{de::DeserializeOwned, Serialize};
5use std::sync::OnceLock;
6use std::time::Duration;
7
8/// Global hook called at dispatch time to capture the current tenant ID.
9/// Returns None when dispatching outside any tenant scope (system jobs).
10/// Registered once during application bootstrap by the framework.
11static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();
12
13/// Register the tenant capture hook.
14///
15/// Called once during application bootstrap. Re-registration is silently ignored.
16/// The hook is invoked at dispatch time to capture the current tenant ID from
17/// task-local storage without requiring a direct dependency on the framework crate.
18pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
19    let _ = TENANT_ID_HOOK.set(f);
20}
21
22/// A pending job dispatch.
23///
24/// This builder allows configuring how a job is dispatched before
25/// actually sending it to the queue.
26pub struct PendingDispatch<J> {
27    job: J,
28    queue: Option<&'static str>,
29    delay: Option<Duration>,
30    /// Explicit tenant ID override. When set, takes precedence over the auto-capture hook.
31    tenant_id: Option<i64>,
32}
33
34impl<J> PendingDispatch<J>
35where
36    J: Job + Serialize + DeserializeOwned,
37{
38    /// Create a new pending dispatch.
39    pub fn new(job: J) -> Self {
40        Self {
41            job,
42            queue: None,
43            delay: None,
44            tenant_id: None,
45        }
46    }
47
48    /// Set the queue to dispatch to.
49    pub fn on_queue(mut self, queue: &'static str) -> Self {
50        self.queue = Some(queue);
51        self
52    }
53
54    /// Set a delay before the job is available for processing.
55    pub fn delay(mut self, duration: Duration) -> Self {
56        self.delay = Some(duration);
57        self
58    }
59
60    /// Override the auto-captured tenant ID.
61    ///
62    /// Use when dispatching jobs on behalf of a tenant from a non-tenant-scoped context
63    /// (e.g., admin actions, CLI commands, system webhooks).
64    /// This explicit value takes precedence over the auto-capture hook.
65    pub fn for_tenant(mut self, tenant_id: i64) -> Self {
66        self.tenant_id = Some(tenant_id);
67        self
68    }
69
70    /// Resolve the tenant ID to attach to the job payload.
71    ///
72    /// Precedence: explicit override (for_tenant) > auto-capture hook > None.
73    fn captured_tenant_id(&self) -> Option<i64> {
74        self.tenant_id
75            .or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
76    }
77
78    /// Dispatch the job to the queue.
79    ///
80    /// In sync mode (`QUEUE_CONNECTION=sync`), the job is executed immediately
81    /// in the current task. This is useful for development and testing.
82    ///
83    /// Otherwise, the job is inserted into the `jobs` table for background
84    /// processing by a `WorkerLoop`.
85    pub async fn dispatch(self) -> Result<(), Error> {
86        if QueueConfig::is_sync_mode() {
87            return self.dispatch_immediately().await;
88        }
89
90        self.dispatch_to_queue().await
91    }
92
93    /// Execute the job immediately (sync mode).
94    ///
95    /// Note: `.for_tenant()` is a no-op in sync mode. The current task's tenant context
96    /// applies directly since the job runs in the same task.
97    async fn dispatch_immediately(self) -> Result<(), Error> {
98        let job_name = self.job.name();
99
100        if self.delay.is_some() {
101            tracing::debug!(
102                job = %job_name,
103                "Job delay ignored in sync mode"
104            );
105        }
106
107        if self.tenant_id.is_some() {
108            tracing::debug!(
109                job = %job_name,
110                tenant_id = ?self.tenant_id,
111                "for_tenant() ignored in sync mode — current task tenant context applies"
112            );
113        }
114
115        tracing::debug!(job = %job_name, "Executing job synchronously");
116
117        match self.job.handle().await {
118            Ok(()) => {
119                tracing::debug!(job = %job_name, "Job completed successfully");
120                Ok(())
121            }
122            Err(e) => {
123                tracing::error!(job = %job_name, error = %e, "Job failed");
124                self.job.failed(&e).await;
125                Err(e)
126            }
127        }
128    }
129
130    /// Push the job to the database queue via `db::enqueue`.
131    async fn dispatch_to_queue(self) -> Result<(), Error> {
132        let conn = crate::db::Queue::connection();
133        let queue = self.queue.unwrap_or("default");
134        let tenant_id = self.captured_tenant_id();
135        let now = chrono::Utc::now();
136        let available_at = match self.delay {
137            Some(d) => now + chrono::Duration::from_std(d).unwrap_or_default(),
138            None => now,
139        };
140        let payload = serde_json::to_string(&self.job)
141            .map_err(|e| Error::SerializationFailed(e.to_string()))?;
142        let job_type = self.job.name().to_string();
143        let max_retries = self.job.max_retries();
144        let idempotency_key = self.job.idempotency_key();
145        crate::db::enqueue(
146            conn,
147            queue,
148            &job_type,
149            &payload,
150            max_retries,
151            idempotency_key.as_deref(),
152            tenant_id,
153            available_at,
154        )
155        .await
156    }
157
158    /// Dispatch the job in a background task (fire and forget).
159    ///
160    /// This spawns the dispatch as a background task, useful when you
161    /// don't need to wait for the dispatch to complete.
162    pub fn dispatch_now(self)
163    where
164        J: Send + 'static,
165    {
166        tokio::spawn(async move {
167            if let Err(e) = self.dispatch().await {
168                tracing::error!(error = %e, "Failed to dispatch job");
169            }
170        });
171    }
172
173    /// Dispatch the job synchronously (fire and forget).
174    ///
175    /// This spawns the dispatch as a background task.
176    #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
177    pub fn dispatch_sync(self)
178    where
179        J: Send + 'static,
180    {
181        self.dispatch_now()
182    }
183}
184
185/// Dispatch a job using the global queue.
186///
187/// In sync mode, the job executes immediately. Otherwise, it is inserted
188/// into the `jobs` table for background processing by a `WorkerLoop`.
189///
190/// # Example
191///
192/// ```rust,ignore
193/// use ferro_queue::{dispatch, Job, Error};
194///
195/// #[derive(Debug, Serialize, Deserialize)]
196/// struct MyJob { data: String }
197///
198/// impl Job for MyJob {
199///     async fn handle(&self) -> Result<(), Error> { Ok(()) }
200/// }
201///
202/// dispatch(MyJob { data: "hello".into() }).await?;
203/// ```
204pub async fn dispatch<J>(job: J) -> Result<(), Error>
205where
206    J: Job + Serialize + DeserializeOwned,
207{
208    PendingDispatch::new(job).dispatch().await
209}
210
211/// Dispatch a job to a specific queue.
212pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
213where
214    J: Job + Serialize + DeserializeOwned,
215{
216    PendingDispatch::new(job).on_queue(queue).dispatch().await
217}
218
219/// Dispatch a job with a delay.
220///
221/// Note: In sync mode, the delay is ignored and the job executes immediately.
222pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
223where
224    J: Job + Serialize + DeserializeOwned,
225{
226    PendingDispatch::new(job).delay(delay).dispatch().await
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::async_trait;
233    use serial_test::serial;
234    use std::env;
235    use std::sync::atomic::{AtomicBool, Ordering};
236    use std::sync::Arc;
237
238    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
239    struct EnvGuard {
240        vars: Vec<String>,
241    }
242
243    impl EnvGuard {
244        fn set(key: &str, value: &str) -> Self {
245            env::set_var(key, value);
246            Self {
247                vars: vec![key.to_string()],
248            }
249        }
250    }
251
252    impl Drop for EnvGuard {
253        fn drop(&mut self) {
254            for var in &self.vars {
255                env::remove_var(var);
256            }
257        }
258    }
259
260    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
261    struct TestJob {
262        #[serde(skip)]
263        executed: Arc<AtomicBool>,
264    }
265
266    impl TestJob {
267        fn new() -> (Self, Arc<AtomicBool>) {
268            let executed = Arc::new(AtomicBool::new(false));
269            (
270                Self {
271                    executed: executed.clone(),
272                },
273                executed,
274            )
275        }
276    }
277
278    #[async_trait]
279    impl Job for TestJob {
280        async fn handle(&self) -> Result<(), Error> {
281            self.executed.store(true, Ordering::SeqCst);
282            Ok(())
283        }
284    }
285
286    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
287    struct FailingJob;
288
289    #[async_trait]
290    impl Job for FailingJob {
291        async fn handle(&self) -> Result<(), Error> {
292            Err(Error::job_failed("FailingJob", "intentional failure"))
293        }
294    }
295
296    #[tokio::test]
297    #[serial]
298    async fn test_sync_mode_executes_immediately() {
299        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
300
301        let (job, executed) = TestJob::new();
302        assert!(!executed.load(Ordering::SeqCst));
303
304        let result = PendingDispatch::new(job).dispatch().await;
305        assert!(result.is_ok());
306        assert!(executed.load(Ordering::SeqCst));
307    }
308
309    #[tokio::test]
310    #[serial]
311    async fn test_sync_mode_handles_failure() {
312        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
313
314        let result = PendingDispatch::new(FailingJob).dispatch().await;
315        assert!(result.is_err());
316    }
317
318    #[tokio::test]
319    #[serial]
320    async fn test_sync_mode_ignores_delay() {
321        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
322
323        let (job, executed) = TestJob::new();
324
325        let start = std::time::Instant::now();
326        let result = PendingDispatch::new(job)
327            .delay(Duration::from_secs(10))
328            .dispatch()
329            .await;
330
331        assert!(result.is_ok());
332        assert!(executed.load(Ordering::SeqCst));
333        // Should complete quickly, not wait 10 seconds
334        assert!(start.elapsed() < Duration::from_secs(5));
335    }
336
337    #[tokio::test]
338    #[serial]
339    async fn test_sync_mode_ignores_queue() {
340        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");
341
342        let (job, executed) = TestJob::new();
343
344        let result = PendingDispatch::new(job)
345            .on_queue("high-priority")
346            .dispatch()
347            .await;
348
349        assert!(result.is_ok());
350        assert!(executed.load(Ordering::SeqCst));
351    }
352
353    // --- Task 2 tests ---
354
355    #[test]
356    fn test_for_tenant_stores_explicit_override() {
357        let (job, _) = TestJob::new();
358        let pending = PendingDispatch::new(job).for_tenant(99);
359        assert_eq!(pending.tenant_id, Some(99));
360    }
361
362    #[test]
363    fn test_for_tenant_explicit_wins_over_hook() {
364        // Even if a hook is registered, explicit for_tenant() takes precedence.
365        // The hook may return Some(42) from test_hook_registered_once (since OnceLock),
366        // but for_tenant(99) overrides it via captured_tenant_id() precedence logic.
367        let (job, _) = TestJob::new();
368        let pending = PendingDispatch::new(job).for_tenant(99);
369        // captured_tenant_id() returns explicit override first
370        assert_eq!(pending.captured_tenant_id(), Some(99));
371    }
372
373    #[test]
374    fn test_no_tenant_id_by_default() {
375        let (job, _) = TestJob::new();
376        let pending = PendingDispatch::new(job);
377        assert_eq!(pending.tenant_id, None);
378    }
379
380    #[test]
381    fn test_hook_registration_second_call_is_noop() {
382        // OnceLock ignores the second set() — first registration wins.
383        // We register once and verify calling register_tenant_capture_hook again doesn't panic.
384        register_tenant_capture_hook(|| Some(42));
385        register_tenant_capture_hook(|| Some(999)); // silently ignored
386                                                    // If the hook was set to 42 first, it remains 42
387        let result = TENANT_ID_HOOK.get().map(|f| f());
388        // We can't guarantee the value here due to test ordering, but it must not be 999
389        // since OnceLock only accepts the first write.
390        // Just assert no panic occurred.
391        let _ = result;
392    }
393
394    #[test]
395    fn test_hook_registration_captures_at_dispatch_time() {
396        // Register hook returning Some(42) — subsequent calls to captured_tenant_id()
397        // without an explicit override will return Some(42) from the hook.
398        register_tenant_capture_hook(|| Some(42));
399        // With no explicit for_tenant(), hook is consulted
400        let (job, _) = TestJob::new();
401        let pending = PendingDispatch::new(job);
402        // captured_tenant_id() consults hook when no explicit override set
403        let captured = pending.captured_tenant_id();
404        // Could be Some(42) if hook registered, or None if hook was already set by prior test
405        // We only assert no panic and that the explicit override path still works.
406        assert!(captured.is_none() || captured == Some(42));
407    }
408}