ferro-queue 0.2.56

Background job queue system for Ferro framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! Job dispatching utilities.

use crate::{Error, Job, QueueConfig};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::OnceLock;
use std::time::Duration;

/// Global hook called at dispatch time to capture the current tenant ID.
/// Returns None when dispatching outside any tenant scope (system jobs).
/// Registered once during application bootstrap by the framework.
static TENANT_ID_HOOK: OnceLock<fn() -> Option<i64>> = OnceLock::new();

/// Register the tenant capture hook.
///
/// Called once during application bootstrap. Re-registration is silently ignored.
/// The hook is invoked at dispatch time to capture the current tenant ID from
/// task-local storage without requiring a direct dependency on the framework crate.
pub fn register_tenant_capture_hook(f: fn() -> Option<i64>) {
    let _ = TENANT_ID_HOOK.set(f);
}

/// A pending job dispatch.
///
/// This builder allows configuring how a job is dispatched before
/// actually sending it to the queue.
pub struct PendingDispatch<J> {
    job: J,
    queue: Option<&'static str>,
    delay: Option<Duration>,
    /// Explicit tenant ID override. When set, takes precedence over the auto-capture hook.
    tenant_id: Option<i64>,
}

impl<J> PendingDispatch<J>
where
    J: Job + Serialize + DeserializeOwned,
{
    /// Create a new pending dispatch.
    pub fn new(job: J) -> Self {
        Self {
            job,
            queue: None,
            delay: None,
            tenant_id: None,
        }
    }

    /// Set the queue to dispatch to.
    pub fn on_queue(mut self, queue: &'static str) -> Self {
        self.queue = Some(queue);
        self
    }

    /// Set a delay before the job is available for processing.
    pub fn delay(mut self, duration: Duration) -> Self {
        self.delay = Some(duration);
        self
    }

    /// Override the auto-captured tenant ID.
    ///
    /// Use when dispatching jobs on behalf of a tenant from a non-tenant-scoped context
    /// (e.g., admin actions, CLI commands, system webhooks).
    /// This explicit value takes precedence over the auto-capture hook.
    pub fn for_tenant(mut self, tenant_id: i64) -> Self {
        self.tenant_id = Some(tenant_id);
        self
    }

    /// Resolve the tenant ID to attach to the job payload.
    ///
    /// Precedence: explicit override (for_tenant) > auto-capture hook > None.
    fn captured_tenant_id(&self) -> Option<i64> {
        self.tenant_id
            .or_else(|| TENANT_ID_HOOK.get().and_then(|f| f()))
    }

    /// Dispatch the job to the queue.
    ///
    /// In sync mode (`QUEUE_CONNECTION=sync`), the job is executed immediately
    /// in the current task. This is useful for development and testing.
    ///
    /// Otherwise, the job is inserted into the `jobs` table for background
    /// processing by a `WorkerLoop`.
    pub async fn dispatch(self) -> Result<(), Error> {
        if QueueConfig::is_sync_mode() {
            return self.dispatch_immediately().await;
        }

        self.dispatch_to_queue().await
    }

    /// Execute the job immediately (sync mode).
    ///
    /// Note: `.for_tenant()` is a no-op in sync mode. The current task's tenant context
    /// applies directly since the job runs in the same task.
    async fn dispatch_immediately(self) -> Result<(), Error> {
        let job_name = self.job.name();

        if self.delay.is_some() {
            tracing::debug!(
                job = %job_name,
                "Job delay ignored in sync mode"
            );
        }

        if self.tenant_id.is_some() {
            tracing::debug!(
                job = %job_name,
                tenant_id = ?self.tenant_id,
                "for_tenant() ignored in sync mode — current task tenant context applies"
            );
        }

        tracing::debug!(job = %job_name, "Executing job synchronously");

        match self.job.handle().await {
            Ok(()) => {
                tracing::debug!(job = %job_name, "Job completed successfully");
                Ok(())
            }
            Err(e) => {
                tracing::error!(job = %job_name, error = %e, "Job failed");
                self.job.failed(&e).await;
                Err(e)
            }
        }
    }

    /// Push the job to the database queue via `db::enqueue`.
    async fn dispatch_to_queue(self) -> Result<(), Error> {
        let conn = crate::db::Queue::connection();
        let queue = self.queue.unwrap_or("default");
        let tenant_id = self.captured_tenant_id();
        let now = chrono::Utc::now();
        let available_at = match self.delay {
            Some(d) => now + chrono::Duration::from_std(d).unwrap_or_default(),
            None => now,
        };
        let payload = serde_json::to_string(&self.job)
            .map_err(|e| Error::SerializationFailed(e.to_string()))?;
        let job_type = self.job.name().to_string();
        let max_retries = self.job.max_retries();
        let idempotency_key = self.job.idempotency_key();
        crate::db::enqueue(
            conn,
            queue,
            &job_type,
            &payload,
            max_retries,
            idempotency_key.as_deref(),
            tenant_id,
            available_at,
        )
        .await
    }

    /// Dispatch the job in a background task (fire and forget).
    ///
    /// This spawns the dispatch as a background task, useful when you
    /// don't need to wait for the dispatch to complete.
    pub fn dispatch_now(self)
    where
        J: Send + 'static,
    {
        tokio::spawn(async move {
            if let Err(e) = self.dispatch().await {
                tracing::error!(error = %e, "Failed to dispatch job");
            }
        });
    }

    /// Dispatch the job synchronously (fire and forget).
    ///
    /// This spawns the dispatch as a background task.
    #[deprecated(since = "0.2.0", note = "Use dispatch_now() instead")]
    pub fn dispatch_sync(self)
    where
        J: Send + 'static,
    {
        self.dispatch_now()
    }
}

/// Dispatch a job using the global queue.
///
/// In sync mode, the job executes immediately. Otherwise, it is inserted
/// into the `jobs` table for background processing by a `WorkerLoop`.
///
/// # Example
///
/// ```rust,ignore
/// use ferro_queue::{dispatch, Job, Error};
///
/// #[derive(Debug, Serialize, Deserialize)]
/// struct MyJob { data: String }
///
/// impl Job for MyJob {
///     async fn handle(&self) -> Result<(), Error> { Ok(()) }
/// }
///
/// dispatch(MyJob { data: "hello".into() }).await?;
/// ```
pub async fn dispatch<J>(job: J) -> Result<(), Error>
where
    J: Job + Serialize + DeserializeOwned,
{
    PendingDispatch::new(job).dispatch().await
}

/// Dispatch a job to a specific queue.
pub async fn dispatch_to<J>(job: J, queue: &'static str) -> Result<(), Error>
where
    J: Job + Serialize + DeserializeOwned,
{
    PendingDispatch::new(job).on_queue(queue).dispatch().await
}

/// Dispatch a job with a delay.
///
/// Note: In sync mode, the delay is ignored and the job executes immediately.
pub async fn dispatch_later<J>(job: J, delay: Duration) -> Result<(), Error>
where
    J: Job + Serialize + DeserializeOwned,
{
    PendingDispatch::new(job).delay(delay).dispatch().await
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::async_trait;
    use serial_test::serial;
    use std::env;
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;

    /// Guard that removes environment variables on drop, ensuring cleanup even on panic.
    struct EnvGuard {
        vars: Vec<String>,
    }

    impl EnvGuard {
        fn set(key: &str, value: &str) -> Self {
            env::set_var(key, value);
            Self {
                vars: vec![key.to_string()],
            }
        }
    }

    impl Drop for EnvGuard {
        fn drop(&mut self) {
            for var in &self.vars {
                env::remove_var(var);
            }
        }
    }

    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
    struct TestJob {
        #[serde(skip)]
        executed: Arc<AtomicBool>,
    }

    impl TestJob {
        fn new() -> (Self, Arc<AtomicBool>) {
            let executed = Arc::new(AtomicBool::new(false));
            (
                Self {
                    executed: executed.clone(),
                },
                executed,
            )
        }
    }

    #[async_trait]
    impl Job for TestJob {
        async fn handle(&self) -> Result<(), Error> {
            self.executed.store(true, Ordering::SeqCst);
            Ok(())
        }
    }

    #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
    struct FailingJob;

    #[async_trait]
    impl Job for FailingJob {
        async fn handle(&self) -> Result<(), Error> {
            Err(Error::job_failed("FailingJob", "intentional failure"))
        }
    }

    #[tokio::test]
    #[serial]
    async fn test_sync_mode_executes_immediately() {
        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");

        let (job, executed) = TestJob::new();
        assert!(!executed.load(Ordering::SeqCst));

        let result = PendingDispatch::new(job).dispatch().await;
        assert!(result.is_ok());
        assert!(executed.load(Ordering::SeqCst));
    }

    #[tokio::test]
    #[serial]
    async fn test_sync_mode_handles_failure() {
        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");

        let result = PendingDispatch::new(FailingJob).dispatch().await;
        assert!(result.is_err());
    }

    #[tokio::test]
    #[serial]
    async fn test_sync_mode_ignores_delay() {
        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");

        let (job, executed) = TestJob::new();

        let start = std::time::Instant::now();
        let result = PendingDispatch::new(job)
            .delay(Duration::from_secs(10))
            .dispatch()
            .await;

        assert!(result.is_ok());
        assert!(executed.load(Ordering::SeqCst));
        // Should complete quickly, not wait 10 seconds
        assert!(start.elapsed() < Duration::from_secs(5));
    }

    #[tokio::test]
    #[serial]
    async fn test_sync_mode_ignores_queue() {
        let _guard = EnvGuard::set("QUEUE_CONNECTION", "sync");

        let (job, executed) = TestJob::new();

        let result = PendingDispatch::new(job)
            .on_queue("high-priority")
            .dispatch()
            .await;

        assert!(result.is_ok());
        assert!(executed.load(Ordering::SeqCst));
    }

    // --- Task 2 tests ---

    #[test]
    fn test_for_tenant_stores_explicit_override() {
        let (job, _) = TestJob::new();
        let pending = PendingDispatch::new(job).for_tenant(99);
        assert_eq!(pending.tenant_id, Some(99));
    }

    #[test]
    fn test_for_tenant_explicit_wins_over_hook() {
        // Even if a hook is registered, explicit for_tenant() takes precedence.
        // The hook may return Some(42) from test_hook_registered_once (since OnceLock),
        // but for_tenant(99) overrides it via captured_tenant_id() precedence logic.
        let (job, _) = TestJob::new();
        let pending = PendingDispatch::new(job).for_tenant(99);
        // captured_tenant_id() returns explicit override first
        assert_eq!(pending.captured_tenant_id(), Some(99));
    }

    #[test]
    fn test_no_tenant_id_by_default() {
        let (job, _) = TestJob::new();
        let pending = PendingDispatch::new(job);
        assert_eq!(pending.tenant_id, None);
    }

    #[test]
    fn test_hook_registration_second_call_is_noop() {
        // OnceLock ignores the second set() — first registration wins.
        // We register once and verify calling register_tenant_capture_hook again doesn't panic.
        register_tenant_capture_hook(|| Some(42));
        register_tenant_capture_hook(|| Some(999)); // silently ignored
                                                    // If the hook was set to 42 first, it remains 42
        let result = TENANT_ID_HOOK.get().map(|f| f());
        // We can't guarantee the value here due to test ordering, but it must not be 999
        // since OnceLock only accepts the first write.
        // Just assert no panic occurred.
        let _ = result;
    }

    #[test]
    fn test_hook_registration_captures_at_dispatch_time() {
        // Register hook returning Some(42) — subsequent calls to captured_tenant_id()
        // without an explicit override will return Some(42) from the hook.
        register_tenant_capture_hook(|| Some(42));
        // With no explicit for_tenant(), hook is consulted
        let (job, _) = TestJob::new();
        let pending = PendingDispatch::new(job);
        // captured_tenant_id() consults hook when no explicit override set
        let captured = pending.captured_tenant_id();
        // Could be Some(42) if hook registered, or None if hook was already set by prior test
        // We only assert no panic and that the explicit override path still works.
        assert!(captured.is_none() || captured == Some(42));
    }
}