Skip to main content

ferro_queue/
worker.rs

1//! DB-backed queue worker (WorkerLoop).
2//!
3//! `WorkerLoop` runs a reaper→claim→spawn cycle. Each iteration:
4//! 1. Runs `db::reaper` to reset stuck claimed rows.
5//! 2. Calls `db::claim` to atomically take the next pending job.
6//! 3. Spawns a task that executes the handler with panic isolation.
7//!
8//! Shutdown is triggered by SIGTERM, Ctrl-C, or a call to `WorkerLoop::shutdown()`.
9//! On shutdown the loop drains in-flight jobs and calls `db::requeue_claimed_by`
10//! to reset any claimed-but-unstarted rows back to `pending` (D-10).
11
12use crate::{Error, Job};
13use async_trait::async_trait;
14use chrono::Utc;
15use futures::FutureExt;
16use sea_orm::DatabaseConnection;
17use std::collections::HashMap;
18use std::future::Future;
19use std::panic::AssertUnwindSafe;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::Semaphore;
25use tracing::{debug, error, info, warn};
26
27// ---------------------------------------------------------------------------
28// TenantScopeProvider — preserved verbatim from previous worker.rs
29// ---------------------------------------------------------------------------
30
31/// Injects tenant scope around job execution.
32///
33/// Implemented by the framework — injected at startup via `WorkerLoop::with_tenant_scope()`.
34/// The provider receives a tenant_id and a boxed future representing the job execution.
35/// It must resolve the tenant, establish a task-local scope, and run the future within it.
36/// Returns `TenantNotFound` error if the tenant ID does not resolve to a valid tenant.
37#[async_trait]
38pub trait TenantScopeProvider: Send + Sync {
39    /// Run the given future within a tenant scope for the specified tenant.
40    async fn with_scope(
41        &self,
42        tenant_id: i64,
43        f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
44    ) -> Result<(), Error>;
45}
46
47// ---------------------------------------------------------------------------
48// WorkerConfig
49// ---------------------------------------------------------------------------
50
51/// Worker configuration.
52#[derive(Debug, Clone)]
53pub struct WorkerConfig {
54    /// Queues to process (in priority order).
55    pub queues: Vec<String>,
56    /// Maximum concurrent jobs.
57    pub max_jobs: usize,
58    /// Sleep duration when no jobs are available.
59    pub sleep_duration: Duration,
60    /// Whether to stop on error.
61    pub stop_on_error: bool,
62    /// How long a claimed job stays invisible before the reaper reclaims it.
63    pub visibility_timeout: Duration,
64}
65
66impl Default for WorkerConfig {
67    fn default() -> Self {
68        Self {
69            queues: vec!["default".to_string()],
70            max_jobs: 10,
71            sleep_duration: Duration::from_secs(1),
72            stop_on_error: false,
73            visibility_timeout: Duration::from_secs(300),
74        }
75    }
76}
77
78impl WorkerConfig {
79    /// Create a new worker config for specific queues.
80    pub fn new(queues: Vec<String>) -> Self {
81        Self {
82            queues,
83            ..Default::default()
84        }
85    }
86
87    /// Set max concurrent jobs.
88    pub fn max_jobs(mut self, max: usize) -> Self {
89        self.max_jobs = max;
90        self
91    }
92
93    /// Set the visibility timeout.
94    pub fn with_visibility_timeout(mut self, d: Duration) -> Self {
95        self.visibility_timeout = d;
96        self
97    }
98}
99
100// ---------------------------------------------------------------------------
101// JobHandler type alias
102// ---------------------------------------------------------------------------
103
104/// Handler closure stored per job type name.
105///
106/// Returns `(Result<(), Error>, retry_delay_secs)` — the delay is the
107/// per-job `retry_delay(attempt)` value captured at registration time.
108type JobHandler = Arc<
109    dyn Fn(String, u32) -> Pin<Box<dyn Future<Output = (Result<(), Error>, Duration)> + Send>>
110        + Send
111        + Sync,
112>;
113
114// ---------------------------------------------------------------------------
115// WorkerLoop
116// ---------------------------------------------------------------------------
117
118/// DB-backed queue worker.
119///
120/// Runs a reaper→claim→spawn cycle until signalled to stop. Panic isolation
121/// (D-11) ensures a panicking job handler never kills the loop. Graceful
122/// shutdown (D-10) drains in-flight jobs and resets claimed-but-unstarted rows.
123pub struct WorkerLoop {
124    /// Worker configuration.
125    config: WorkerConfig,
126    /// Registered job handlers by type name.
127    handlers: HashMap<String, JobHandler>,
128    /// Semaphore limiting concurrent in-flight jobs.
129    semaphore: Arc<Semaphore>,
130    /// Shutdown flag — set by SIGTERM/Ctrl-C or by `WorkerLoop::shutdown()`.
131    shutdown: Arc<AtomicBool>,
132    /// Optional tenant scope provider.
133    tenant_scope: Option<Arc<dyn TenantScopeProvider>>,
134    /// Unique identifier for this worker instance (used by requeue_claimed_by).
135    worker_id: String,
136}
137
138impl WorkerLoop {
139    /// Create a new worker loop.
140    ///
141    /// A random UUID is generated as the worker ID. The worker reads the DB
142    /// connection from `Queue::connection()` — no connection argument needed.
143    pub fn new(config: WorkerConfig) -> Self {
144        let semaphore = Arc::new(Semaphore::new(config.max_jobs));
145        Self {
146            config,
147            handlers: HashMap::new(),
148            semaphore,
149            shutdown: Arc::new(AtomicBool::new(false)),
150            tenant_scope: None,
151            worker_id: uuid::Uuid::new_v4().to_string(),
152        }
153    }
154
155    /// Inject a tenant scope provider for tenant-aware job execution.
156    ///
157    /// When set, jobs with a `tenant_id` are executed inside a tenant context
158    /// scope. Jobs without a tenant_id or workers without a provider run in
159    /// the default (system) scope.
160    pub fn with_tenant_scope(mut self, provider: Arc<dyn TenantScopeProvider>) -> Self {
161        self.tenant_scope = Some(provider);
162        self
163    }
164
165    /// Register a job handler.
166    ///
167    /// The handler closure deserializes the job from JSON, calls `handle()`,
168    /// and also captures the per-job `retry_delay(attempt)` for use on failure.
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// worker_loop.register::<SendEmailJob>();
174    /// ```
175    pub fn register<J>(&mut self)
176    where
177        J: Job + serde::de::DeserializeOwned + 'static,
178    {
179        let type_name = std::any::type_name::<J>().to_string();
180
181        let handler: JobHandler = Arc::new(move |data: String, attempt: u32| {
182            Box::pin(async move {
183                let job: J = match serde_json::from_str::<J>(&data) {
184                    Ok(j) => j,
185                    Err(e) => {
186                        return (
187                            Err(Error::DeserializationFailed(e.to_string())),
188                            Duration::from_secs(5),
189                        )
190                    }
191                };
192                // Capture retry_delay before consuming the job.
193                let delay = job.retry_delay(attempt);
194                let result = job.handle().await;
195                (result, delay)
196            })
197        });
198
199        self.handlers.insert(type_name, handler);
200    }
201
202    /// Build a `WorkerLoop` and apply all job types registered via `Queue::register`.
203    ///
204    /// This is the entry point used by the framework's server boot path to create
205    /// the auto-started worker when at least one job type has been registered.
206    pub fn from_registry(config: WorkerConfig) -> Self {
207        let mut w = Self::new(config);
208        crate::db::Queue::apply_registrars(&mut w);
209        w
210    }
211
212    /// Signal the worker loop to shut down gracefully.
213    ///
214    /// Sets the same AtomicBool that the SIGTERM/Ctrl-C handler sets, so
215    /// programmatic and signal-based shutdown share one path.
216    pub fn shutdown(&self) {
217        self.shutdown.store(true, Ordering::SeqCst);
218    }
219
220    /// Run the worker loop until shutdown.
221    ///
222    /// Loop behaviour:
223    /// 1. Check shutdown flag — if set, drain in-flight jobs and requeue.
224    /// 2. For each queue: run reaper, then attempt claim.
225    ///    - If a job is claimed, spawn it and loop immediately (no idle sleep).
226    /// 3. If no jobs were found across all queues, sleep `config.sleep_duration`.
227    pub async fn run(&self) -> Result<(), Error> {
228        let conn: &'static DatabaseConnection = crate::db::Queue::connection();
229
230        info!(
231            worker_id = %self.worker_id,
232            queues = ?self.config.queues,
233            max_jobs = self.config.max_jobs,
234            "WorkerLoop starting"
235        );
236
237        // Install SIGTERM / Ctrl-C handler — sets the same shutdown flag.
238        // Hold the JoinHandle so it is aborted when `run()` returns (WR-02);
239        // otherwise a clean shutdown or a `stop_on_error` exit would leak a
240        // detached task holding a `shutdown` Arc clone, and a second `run()`
241        // would install a duplicate handler.
242        let signal_task = {
243            let shutdown = self.shutdown.clone();
244            tokio::spawn(async move {
245                // Registration failure must not panic inside a detached task
246                // (unobservable to the caller). Log and request shutdown instead.
247                let mut sigterm = match tokio::signal::unix::signal(
248                    tokio::signal::unix::SignalKind::terminate(),
249                ) {
250                    Ok(s) => s,
251                    Err(e) => {
252                        error!(error = %e, "failed to install SIGTERM handler — requesting shutdown");
253                        shutdown.store(true, Ordering::SeqCst);
254                        return;
255                    }
256                };
257                tokio::select! {
258                    _ = sigterm.recv() => {
259                        info!("SIGTERM received — shutting down WorkerLoop");
260                    }
261                    _ = tokio::signal::ctrl_c() => {
262                        info!("Ctrl-C received — shutting down WorkerLoop");
263                    }
264                }
265                shutdown.store(true, Ordering::SeqCst);
266            })
267        };
268        // Abort the signal task on any exit path from this function.
269        let _signal_guard = AbortOnDrop(signal_task);
270
271        'outer: loop {
272            // --- Shutdown gate ---
273            if self.shutdown.load(Ordering::SeqCst) {
274                info!(worker_id = %self.worker_id, "Shutdown flag set — draining in-flight jobs");
275
276                // Drain: acquire ALL permits and HOLD them across the requeue
277                // (WR-03). Binding to a named guard keeps the permits held until
278                // the end of this scope; `let _ =` would release them
279                // immediately, letting a still-pending spawn_job task grab a
280                // permit and start a new job after the drain — which
281                // requeue_claimed_by could then yank out from under it.
282                let _drain_guard = self
283                    .semaphore
284                    .acquire_many(self.config.max_jobs as u32)
285                    .await;
286
287                // Requeue any claimed-but-unstarted rows belonging to this worker.
288                crate::db::requeue_claimed_by(conn, &self.worker_id)
289                    .await
290                    .map_err(|e| {
291                        error!(error = %e, "requeue_claimed_by failed during shutdown");
292                        e
293                    })?;
294
295                info!(worker_id = %self.worker_id, "WorkerLoop shut down cleanly");
296                // _drain_guard dropped here, after requeue completes.
297                return Ok(());
298            }
299
300            // --- Reaper + claim cycle ---
301            for queue in &self.config.queues {
302                // Run reaper before each claim attempt (D-14).
303                match crate::db::reaper(conn, queue, self.config.visibility_timeout).await {
304                    Ok(()) => {}
305                    Err(e) => {
306                        error!(queue = %queue, error = %e, "reaper error");
307                        if self.config.stop_on_error {
308                            return Err(e);
309                        }
310                    }
311                }
312
313                // Attempt an atomic claim.
314                match crate::db::claim(conn, queue, &self.worker_id).await {
315                    Ok(Some(job_row)) => {
316                        self.spawn_job(conn, job_row);
317                        continue 'outer; // claimed something — loop immediately without sleep
318                    }
319                    Ok(None) => {} // nothing in this queue
320                    Err(e) => {
321                        error!(queue = %queue, error = %e, "claim error");
322                        if self.config.stop_on_error {
323                            return Err(e);
324                        }
325                    }
326                }
327            }
328
329            // No jobs found across all queues — idle sleep (D-08).
330            tokio::time::sleep(self.config.sleep_duration).await;
331        }
332    }
333
334    /// Spawn a task that executes `job_row` with panic isolation.
335    ///
336    /// Acquires a semaphore permit before spawning. The permit is held for the
337    /// lifetime of the task, enforcing `config.max_jobs` concurrency.
338    fn spawn_job(&self, conn: &'static DatabaseConnection, job_row: crate::db::JobRow) {
339        // Clone the semaphore Arc; the spawned task acquires an owned permit inside,
340        // keeping it held for the full duration of the job.
341        let permit = self.semaphore.clone();
342        let handlers = self.handlers.clone();
343        let tenant_scope = self.tenant_scope.clone();
344        let worker_id = self.worker_id.clone();
345        let shutdown = self.shutdown.clone();
346
347        tokio::spawn(async move {
348            // Gate on the shutdown flag before doing any work (WR-03). If drain
349            // has begun, do not start this job — the claimed row will be reset
350            // to pending by `requeue_claimed_by`. Checking before acquiring the
351            // permit also avoids contending with the drain's `acquire_many`.
352            if shutdown.load(Ordering::SeqCst) {
353                return;
354            }
355
356            // Acquire the permit inside the task so it is held for the full duration.
357            let _permit = permit.acquire_owned().await.expect("semaphore closed");
358
359            // Re-check after acquiring: drain may have set the flag while we
360            // waited on the permit. If so, bail before executing so the claimed
361            // row is left for `requeue_claimed_by` rather than running a job
362            // whose row is about to be (or has been) requeued.
363            if shutdown.load(Ordering::SeqCst) {
364                return;
365            }
366
367            let job_id = job_row.id;
368            let job_type = job_row.job_type.clone();
369            let tenant_id = job_row.tenant_id;
370            let attempts = job_row.attempts;
371            let max_retries = job_row.max_retries;
372
373            debug!(
374                job_id = %job_id,
375                job_type = %job_type,
376                attempts = attempts,
377                tenant_id = ?tenant_id,
378                worker_id = %worker_id,
379                "Executing job"
380            );
381
382            let handler = match handlers.get(&job_type) {
383                Some(h) => h.clone(),
384                None => {
385                    warn!(job_type = %job_type, "No handler registered — releasing job for retry");
386                    // Release with a short delay — the job will be retried.
387                    let available_at = Utc::now()
388                        + chrono::Duration::from_std(Duration::from_secs(5)).unwrap_or_default();
389                    crate::db::release_job(conn, job_id, attempts + 1, available_at)
390                        .await
391                        .ok();
392                    return;
393                }
394            };
395
396            // Panic-isolated execution (D-11, T-185-03).
397            // AssertUnwindSafe is sound here: the handler closure captures only
398            // Arc references and owned data; we don't observe any interior state
399            // after a panic.
400            let result = AssertUnwindSafe(async move {
401                // Tenant scope wrap (D-17, T-185-08).
402                match (&tenant_scope, tenant_id) {
403                    (Some(scope), Some(id)) => {
404                        let fut = Box::pin(async move {
405                            let (res, _delay) = handler(job_row.payload.clone(), attempts).await;
406                            res
407                        });
408                        (scope.with_scope(id, fut).await, Duration::from_secs(5))
409                    }
410                    _ => handler(job_row.payload.clone(), attempts).await,
411                }
412            })
413            .catch_unwind()
414            .await;
415
416            match result {
417                // Success path: delete the row (D-04 delete-on-success).
418                Ok((Ok(()), _)) => {
419                    debug!(job_id = %job_id, job_type = %job_type, "Job succeeded — deleting row");
420                    crate::db::delete_job(conn, job_id).await.ok();
421                }
422
423                // Handler returned Err — normal failure path.
424                Ok((Err(e), retry_delay)) => {
425                    error!(job_id = %job_id, job_type = %job_type, error = %e, "Job handler returned error");
426                    handle_failure(
427                        conn,
428                        job_id,
429                        attempts,
430                        max_retries,
431                        &e.to_string(),
432                        retry_delay,
433                    )
434                    .await;
435                }
436
437                // Handler panicked — counts as a failed attempt (D-11).
438                Err(_panic) => {
439                    error!(job_id = %job_id, job_type = %job_type, "Job handler panicked — counting as failure");
440                    let msg = "job handler panicked";
441                    // Use a default jitter delay for panics (we can't call retry_delay
442                    // because the handler destructured before the panic).
443                    let delay = default_jitter_delay(attempts);
444                    handle_failure(conn, job_id, attempts, max_retries, msg, delay).await;
445                }
446            }
447        });
448    }
449}
450
451/// Handle a job failure: either park as failed or release for retry.
452async fn handle_failure(
453    conn: &'static DatabaseConnection,
454    job_id: i64,
455    attempts: u32,
456    max_retries: u32,
457    err_msg: &str,
458    retry_delay: Duration,
459) {
460    if attempts + 1 >= max_retries {
461        // Exhausted — park as failed.
462        warn!(job_id = %job_id, attempts = attempts, "Job exhausted retries — parking as failed");
463        crate::db::fail_job(conn, job_id, err_msg).await.ok();
464    } else {
465        // Retry — release with jittered delay.
466        let available_at = Utc::now() + chrono::Duration::from_std(retry_delay).unwrap_or_default();
467        debug!(
468            job_id = %job_id,
469            retry_at = %available_at,
470            "Scheduling job retry"
471        );
472        crate::db::release_job(conn, job_id, attempts + 1, available_at)
473            .await
474            .ok();
475    }
476}
477
478/// Full-jitter exponential backoff for panic cases where `retry_delay` cannot
479/// be called on the job instance.
480///
481/// Formula: `rand(0..=min(cap, base * 2^attempt))`, base 5 s, cap 15 min.
482fn default_jitter_delay(attempt: u32) -> Duration {
483    use rand::Rng;
484    let base_secs: u64 = 5;
485    let cap_secs: u64 = 15 * 60;
486    let max_delay = cap_secs.min(base_secs.saturating_mul(2u64.saturating_pow(attempt)));
487    let jitter = rand::thread_rng().gen_range(0..=max_delay);
488    Duration::from_secs(jitter)
489}
490
491/// RAII guard that aborts a spawned task when dropped.
492///
493/// Used to tie the SIGTERM/Ctrl-C signal task's lifetime to `WorkerLoop::run`
494/// so it never outlives the loop on any exit path (WR-02).
495struct AbortOnDrop(tokio::task::JoinHandle<()>);
496
497impl Drop for AbortOnDrop {
498    fn drop(&mut self) {
499        self.0.abort();
500    }
501}
502
503/// Type alias for API continuity with callers that use the old `Worker` name.
504pub type Worker = WorkerLoop;
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use std::sync::Mutex;
510
511    /// Verifies TenantScopeProvider is object-safe (can be wrapped in Arc<dyn TenantScopeProvider>).
512    #[test]
513    fn test_tenant_scope_provider_is_object_safe() {
514        struct NoopProvider;
515
516        #[async_trait]
517        impl TenantScopeProvider for NoopProvider {
518            async fn with_scope(
519                &self,
520                _tenant_id: i64,
521                f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
522            ) -> Result<(), Error> {
523                f.await
524            }
525        }
526
527        // If this compiles, the trait is object-safe.
528        let _provider: Arc<dyn TenantScopeProvider> = Arc::new(NoopProvider);
529    }
530
531    /// Mock TenantScopeProvider that tracks calls and optionally fails.
532    struct MockScopeProvider {
533        called_with: Arc<Mutex<Vec<i64>>>,
534        should_fail: bool,
535    }
536
537    impl MockScopeProvider {
538        fn new() -> Self {
539            Self {
540                called_with: Arc::new(Mutex::new(Vec::new())),
541                should_fail: false,
542            }
543        }
544
545        fn failing() -> Self {
546            Self {
547                called_with: Arc::new(Mutex::new(Vec::new())),
548                should_fail: true,
549            }
550        }
551    }
552
553    #[async_trait]
554    impl TenantScopeProvider for MockScopeProvider {
555        async fn with_scope(
556            &self,
557            tenant_id: i64,
558            f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
559        ) -> Result<(), Error> {
560            self.called_with.lock().unwrap().push(tenant_id);
561            if self.should_fail {
562                return Err(Error::tenant_not_found(tenant_id));
563            }
564            f.await
565        }
566    }
567
568    /// WorkerLoop can be constructed without a connection argument.
569    #[test]
570    fn test_worker_loop_new() {
571        let w = WorkerLoop::new(WorkerConfig::default());
572        assert!(w.tenant_scope.is_none());
573        assert!(!w.worker_id.is_empty());
574    }
575
576    /// Worker::with_tenant_scope() stores the provider.
577    #[test]
578    fn test_with_tenant_scope_stores_provider() {
579        let w = WorkerLoop::new(WorkerConfig::default());
580        let provider = Arc::new(MockScopeProvider::new());
581        let w = w.with_tenant_scope(provider);
582        assert!(w.tenant_scope.is_some());
583    }
584
585    /// Worker without tenant_scope has None by default.
586    #[test]
587    fn test_worker_without_scope_has_none_by_default() {
588        let w = WorkerLoop::new(WorkerConfig::default());
589        assert!(w.tenant_scope.is_none());
590    }
591
592    /// MockScopeProvider: with_scope calls the job future and records the tenant_id.
593    #[tokio::test]
594    async fn test_mock_scope_provider_calls_future() {
595        let provider = MockScopeProvider::new();
596        let calls = provider.called_with.clone();
597
598        let result = provider.with_scope(42, Box::pin(async { Ok(()) })).await;
599
600        assert!(result.is_ok());
601        assert_eq!(calls.lock().unwrap().as_slice(), &[42]);
602    }
603
604    /// MockScopeProvider: failing variant returns TenantNotFound.
605    #[tokio::test]
606    async fn test_mock_scope_provider_failure_returns_tenant_not_found() {
607        let provider = MockScopeProvider::failing();
608
609        let result = provider.with_scope(99, Box::pin(async { Ok(()) })).await;
610
611        assert!(matches!(
612            result,
613            Err(Error::TenantNotFound { tenant_id: 99 })
614        ));
615    }
616
617    /// scope_dispatch_for_tenant: Some(id) + provider -> with_scope called.
618    #[tokio::test]
619    async fn test_scope_dispatch_tenant_id_some_calls_with_scope() {
620        let mock = MockScopeProvider::new();
621        let calls = mock.called_with.clone();
622        let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
623
624        let tenant_id: Option<i64> = Some(1);
625        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
626
627        let job_ran = Arc::new(Mutex::new(false));
628        let job_ran_clone = job_ran.clone();
629        let job_fut = Box::pin(async move {
630            *job_ran_clone.lock().unwrap() = true;
631            Ok(())
632        });
633
634        let result = match (&tenant_scope, tenant_id) {
635            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
636            _ => job_fut.await,
637        };
638
639        assert!(result.is_ok());
640        assert_eq!(calls.lock().unwrap().as_slice(), &[1i64]);
641        assert!(*job_ran.lock().unwrap(), "job future must have been called");
642    }
643
644    /// scope_dispatch_no_tenant_id: None + provider -> with_scope NOT called.
645    #[tokio::test]
646    async fn test_scope_dispatch_tenant_id_none_skips_with_scope() {
647        let mock = MockScopeProvider::new();
648        let calls = mock.called_with.clone();
649        let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
650
651        let tenant_id: Option<i64> = None;
652        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
653
654        let job_ran = Arc::new(Mutex::new(false));
655        let job_ran_clone = job_ran.clone();
656        let job_fut = Box::pin(async move {
657            *job_ran_clone.lock().unwrap() = true;
658            Ok(())
659        });
660
661        let result = match (&tenant_scope, tenant_id) {
662            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
663            _ => job_fut.await,
664        };
665
666        assert!(result.is_ok());
667        assert!(
668            calls.lock().unwrap().is_empty(),
669            "with_scope must not be called when tenant_id is None"
670        );
671        assert!(
672            *job_ran.lock().unwrap(),
673            "job future must still run directly"
674        );
675    }
676
677    /// scope_dispatch_no_provider: Some(id) + no provider -> job runs directly.
678    #[tokio::test]
679    async fn test_scope_dispatch_no_provider_runs_job_directly() {
680        let tenant_id: Option<i64> = Some(1);
681        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = None;
682
683        let job_ran = Arc::new(Mutex::new(false));
684        let job_ran_clone = job_ran.clone();
685        let job_fut = Box::pin(async move {
686            *job_ran_clone.lock().unwrap() = true;
687            Ok(())
688        });
689
690        let result = match (&tenant_scope, tenant_id) {
691            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
692            _ => job_fut.await,
693        };
694
695        assert!(result.is_ok());
696        assert!(
697            *job_ran.lock().unwrap(),
698            "job must run directly without a provider"
699        );
700    }
701
702    /// Shutdown flag is set by WorkerLoop::shutdown().
703    #[test]
704    fn test_shutdown_sets_flag() {
705        let w = WorkerLoop::new(WorkerConfig::default());
706        assert!(!w.shutdown.load(Ordering::SeqCst));
707        w.shutdown();
708        assert!(w.shutdown.load(Ordering::SeqCst));
709    }
710
711    /// WorkerConfig visibility_timeout default is 5 minutes.
712    #[test]
713    fn test_worker_config_visibility_timeout_default() {
714        let c = WorkerConfig::default();
715        assert_eq!(c.visibility_timeout, Duration::from_secs(300));
716    }
717
718    /// default_jitter_delay stays within bounds.
719    #[test]
720    fn test_default_jitter_delay_bounds() {
721        for _ in 0..50 {
722            assert!(default_jitter_delay(0).as_secs() <= 5);
723            assert!(default_jitter_delay(3).as_secs() <= 40);
724            assert!(default_jitter_delay(30).as_secs() <= 900);
725        }
726    }
727}