Skip to main content

ferro_queue/
worker.rs

1//! DB-backed queue worker (WorkerLoop).
2//!
3//! `WorkerLoop` runs a reaper→claim→spawn cycle. Each iteration:
4//! 1. Runs `db::reaper` to reset stuck claimed rows.
5//! 2. Calls `db::claim` to atomically take the next pending job.
6//! 3. Spawns a task that executes the handler with panic isolation.
7//!
8//! Shutdown is triggered by SIGTERM, Ctrl-C, or a call to `WorkerLoop::shutdown()`.
9//! On shutdown the loop drains in-flight jobs and calls `db::requeue_claimed_by`
10//! to reset any claimed-but-unstarted rows back to `pending` (D-10).
11
12use crate::{Error, Job};
13use async_trait::async_trait;
14use chrono::Utc;
15use futures::FutureExt;
16use sea_orm::DatabaseConnection;
17use std::collections::HashMap;
18use std::future::Future;
19use std::panic::AssertUnwindSafe;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::Semaphore;
25use tracing::{debug, error, info, warn};
26
27// ---------------------------------------------------------------------------
28// TenantScopeProvider — preserved verbatim from previous worker.rs
29// ---------------------------------------------------------------------------
30
31/// Injects tenant scope around job execution.
32///
33/// Implemented by the framework — injected at startup via `WorkerLoop::with_tenant_scope()`.
34/// The provider receives a tenant_id and a boxed future representing the job execution.
35/// It must resolve the tenant, establish a task-local scope, and run the future within it.
36/// Returns `TenantNotFound` error if the tenant ID does not resolve to a valid tenant.
37#[async_trait]
38pub trait TenantScopeProvider: Send + Sync {
39    /// Run the given future within a tenant scope for the specified tenant.
40    async fn with_scope(
41        &self,
42        tenant_id: i64,
43        f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
44    ) -> Result<(), Error>;
45}
46
47// ---------------------------------------------------------------------------
48// WorkerConfig
49// ---------------------------------------------------------------------------
50
51/// Worker configuration.
52#[derive(Debug, Clone)]
53pub struct WorkerConfig {
54    /// Queues to process (in priority order).
55    pub queues: Vec<String>,
56    /// Maximum concurrent jobs.
57    pub max_jobs: usize,
58    /// Sleep duration when no jobs are available.
59    pub sleep_duration: Duration,
60    /// Whether to stop on error.
61    pub stop_on_error: bool,
62    /// How long a claimed job stays invisible before the reaper reclaims it.
63    pub visibility_timeout: Duration,
64}
65
66impl Default for WorkerConfig {
67    fn default() -> Self {
68        Self {
69            queues: vec!["default".to_string()],
70            max_jobs: 10,
71            sleep_duration: Duration::from_secs(1),
72            stop_on_error: false,
73            visibility_timeout: Duration::from_secs(300),
74        }
75    }
76}
77
78impl WorkerConfig {
79    /// Create a new worker config for specific queues.
80    pub fn new(queues: Vec<String>) -> Self {
81        Self {
82            queues,
83            ..Default::default()
84        }
85    }
86
87    /// Set max concurrent jobs.
88    pub fn max_jobs(mut self, max: usize) -> Self {
89        self.max_jobs = max;
90        self
91    }
92
93    /// Set the visibility timeout.
94    pub fn with_visibility_timeout(mut self, d: Duration) -> Self {
95        self.visibility_timeout = d;
96        self
97    }
98}
99
100// ---------------------------------------------------------------------------
101// JobHandler type alias
102// ---------------------------------------------------------------------------
103
104/// Handler closure stored per job type name.
105///
106/// Returns `(Result<(), Error>, retry_delay_secs)` — the delay is the
107/// per-job `retry_delay(attempt)` value captured at registration time.
108type JobHandler = Arc<
109    dyn Fn(String, u32) -> Pin<Box<dyn Future<Output = (Result<(), Error>, Duration)> + Send>>
110        + Send
111        + Sync,
112>;
113
114// ---------------------------------------------------------------------------
115// WorkerLoop
116// ---------------------------------------------------------------------------
117
118/// DB-backed queue worker.
119///
120/// Runs a reaper→claim→spawn cycle until signalled to stop. Panic isolation
121/// (D-11) ensures a panicking job handler never kills the loop. Graceful
122/// shutdown (D-10) drains in-flight jobs and resets claimed-but-unstarted rows.
123pub struct WorkerLoop {
124    /// Worker configuration.
125    config: WorkerConfig,
126    /// Registered job handlers by type name.
127    handlers: HashMap<String, JobHandler>,
128    /// Semaphore limiting concurrent in-flight jobs.
129    semaphore: Arc<Semaphore>,
130    /// Shutdown flag — set by SIGTERM/Ctrl-C or by `WorkerLoop::shutdown()`.
131    shutdown: Arc<AtomicBool>,
132    /// Optional tenant scope provider.
133    tenant_scope: Option<Arc<dyn TenantScopeProvider>>,
134    /// Unique identifier for this worker instance (used by requeue_claimed_by).
135    worker_id: String,
136}
137
138impl WorkerLoop {
139    /// Create a new worker loop.
140    ///
141    /// A random UUID is generated as the worker ID. The worker reads the DB
142    /// connection from `Queue::connection()` — no connection argument needed.
143    pub fn new(config: WorkerConfig) -> Self {
144        let semaphore = Arc::new(Semaphore::new(config.max_jobs));
145        Self {
146            config,
147            handlers: HashMap::new(),
148            semaphore,
149            shutdown: Arc::new(AtomicBool::new(false)),
150            tenant_scope: None,
151            worker_id: uuid::Uuid::new_v4().to_string(),
152        }
153    }
154
155    /// Inject a tenant scope provider for tenant-aware job execution.
156    ///
157    /// When set, jobs with a `tenant_id` are executed inside a tenant context
158    /// scope. Jobs without a tenant_id or workers without a provider run in
159    /// the default (system) scope.
160    pub fn with_tenant_scope(mut self, provider: Arc<dyn TenantScopeProvider>) -> Self {
161        self.tenant_scope = Some(provider);
162        self
163    }
164
165    /// Register a job handler.
166    ///
167    /// The handler closure deserializes the job from JSON, calls `handle()`,
168    /// and also captures the per-job `retry_delay(attempt)` for use on failure.
169    ///
170    /// # Example
171    ///
172    /// ```rust,ignore
173    /// worker_loop.register::<SendEmailJob>();
174    /// ```
175    pub fn register<J>(&mut self)
176    where
177        J: Job + serde::de::DeserializeOwned + 'static,
178    {
179        let type_name = std::any::type_name::<J>().to_string();
180
181        let handler: JobHandler = Arc::new(move |data: String, attempt: u32| {
182            Box::pin(async move {
183                let job: J = match serde_json::from_str::<J>(&data) {
184                    Ok(j) => j,
185                    Err(e) => {
186                        return (
187                            Err(Error::DeserializationFailed(e.to_string())),
188                            Duration::from_secs(5),
189                        )
190                    }
191                };
192                // Capture retry_delay before consuming the job.
193                let delay = job.retry_delay(attempt);
194                let result = job.handle().await;
195                (result, delay)
196            })
197        });
198
199        self.handlers.insert(type_name, handler);
200    }
201
202    /// Build a `WorkerLoop` and apply all job types registered via `Queue::register`.
203    ///
204    /// This is the entry point used by the framework's server boot path to create
205    /// the auto-started worker when at least one job type has been registered.
206    pub fn from_registry(config: WorkerConfig) -> Self {
207        let mut w = Self::new(config);
208        crate::db::Queue::apply_registrars(&mut w);
209        w
210    }
211
212    /// Signal the worker loop to shut down gracefully.
213    ///
214    /// Sets the same AtomicBool that the SIGTERM/Ctrl-C handler sets, so
215    /// programmatic and signal-based shutdown share one path.
216    pub fn shutdown(&self) {
217        self.shutdown.store(true, Ordering::SeqCst);
218    }
219
220    /// Run the worker loop until shutdown.
221    ///
222    /// Loop behaviour:
223    /// 1. Check shutdown flag — if set, drain in-flight jobs and requeue.
224    /// 2. For each queue: run reaper, then attempt claim.
225    ///    - If a job is claimed, spawn it and loop immediately (no idle sleep).
226    /// 3. If no jobs were found across all queues, sleep `config.sleep_duration`.
227    pub async fn run(&self) -> Result<(), Error> {
228        let conn: &'static DatabaseConnection = crate::db::Queue::connection();
229
230        info!(
231            worker_id = %self.worker_id,
232            queues = ?self.config.queues,
233            max_jobs = self.config.max_jobs,
234            "WorkerLoop starting"
235        );
236
237        // Install SIGTERM / Ctrl-C handler — sets the same shutdown flag.
238        // Hold the JoinHandle so it is aborted when `run()` returns (WR-02);
239        // otherwise a clean shutdown or a `stop_on_error` exit would leak a
240        // detached task holding a `shutdown` Arc clone, and a second `run()`
241        // would install a duplicate handler.
242        let signal_task = {
243            let shutdown = self.shutdown.clone();
244            tokio::spawn(async move {
245                // Registration failure must not panic inside a detached task
246                // (unobservable to the caller). Log and request shutdown instead.
247                let mut sigterm = match tokio::signal::unix::signal(
248                    tokio::signal::unix::SignalKind::terminate(),
249                ) {
250                    Ok(s) => s,
251                    Err(e) => {
252                        error!(error = %e, "failed to install SIGTERM handler — requesting shutdown");
253                        shutdown.store(true, Ordering::SeqCst);
254                        return;
255                    }
256                };
257                tokio::select! {
258                    _ = sigterm.recv() => {
259                        info!("SIGTERM received — shutting down WorkerLoop");
260                    }
261                    _ = tokio::signal::ctrl_c() => {
262                        info!("Ctrl-C received — shutting down WorkerLoop");
263                    }
264                }
265                shutdown.store(true, Ordering::SeqCst);
266            })
267        };
268        // Abort the signal task on any exit path from this function.
269        let _signal_guard = AbortOnDrop(signal_task);
270
271        // Reap orphan `claimed` rows left behind by a previous worker that died
272        // mid-job (SIGKILL on deploy restart, OOM). Without this they linger
273        // until the visibility-timeout reaper catches them — up to
274        // `config.visibility_timeout` (default 300s) of stale "in progress" UI.
275        // Runs once before the claim loop starts. Scoped to this worker's
276        // queues so concurrent workers on disjoint queues don't clobber each
277        // other; the model assumed is single-worker-per-queue.
278        match crate::db::reap_startup_claims(conn, &self.config.queues).await {
279            Ok(0) => {}
280            Ok(n) => {
281                info!(worker_id = %self.worker_id, reaped = n, "reaped orphan claimed jobs at startup")
282            }
283            Err(e) => {
284                error!(worker_id = %self.worker_id, error = %e, "startup orphan reap failed — continuing")
285            }
286        }
287
288        'outer: loop {
289            // --- Shutdown gate ---
290            if self.shutdown.load(Ordering::SeqCst) {
291                info!(worker_id = %self.worker_id, "Shutdown flag set — draining in-flight jobs");
292
293                // Drain: acquire ALL permits and HOLD them across the requeue
294                // (WR-03). Binding to a named guard keeps the permits held until
295                // the end of this scope; `let _ =` would release them
296                // immediately, letting a still-pending spawn_job task grab a
297                // permit and start a new job after the drain — which
298                // requeue_claimed_by could then yank out from under it.
299                let _drain_guard = self
300                    .semaphore
301                    .acquire_many(self.config.max_jobs as u32)
302                    .await;
303
304                // Requeue any claimed-but-unstarted rows belonging to this worker.
305                crate::db::requeue_claimed_by(conn, &self.worker_id)
306                    .await
307                    .map_err(|e| {
308                        error!(error = %e, "requeue_claimed_by failed during shutdown");
309                        e
310                    })?;
311
312                info!(worker_id = %self.worker_id, "WorkerLoop shut down cleanly");
313                // _drain_guard dropped here, after requeue completes.
314                return Ok(());
315            }
316
317            // --- Reaper + claim cycle ---
318            for queue in &self.config.queues {
319                // Run reaper before each claim attempt (D-14).
320                match crate::db::reaper(conn, queue, self.config.visibility_timeout).await {
321                    Ok(()) => {}
322                    Err(e) => {
323                        error!(queue = %queue, error = %e, "reaper error");
324                        if self.config.stop_on_error {
325                            return Err(e);
326                        }
327                    }
328                }
329
330                // Attempt an atomic claim.
331                match crate::db::claim(conn, queue, &self.worker_id).await {
332                    Ok(Some(job_row)) => {
333                        self.spawn_job(conn, job_row);
334                        continue 'outer; // claimed something — loop immediately without sleep
335                    }
336                    Ok(None) => {} // nothing in this queue
337                    Err(e) => {
338                        error!(queue = %queue, error = %e, "claim error");
339                        if self.config.stop_on_error {
340                            return Err(e);
341                        }
342                    }
343                }
344            }
345
346            // No jobs found across all queues — idle sleep (D-08).
347            tokio::time::sleep(self.config.sleep_duration).await;
348        }
349    }
350
351    /// Spawn a task that executes `job_row` with panic isolation.
352    ///
353    /// Acquires a semaphore permit before spawning. The permit is held for the
354    /// lifetime of the task, enforcing `config.max_jobs` concurrency.
355    fn spawn_job(&self, conn: &'static DatabaseConnection, job_row: crate::db::JobRow) {
356        // Clone the semaphore Arc; the spawned task acquires an owned permit inside,
357        // keeping it held for the full duration of the job.
358        let permit = self.semaphore.clone();
359        let handlers = self.handlers.clone();
360        let tenant_scope = self.tenant_scope.clone();
361        let worker_id = self.worker_id.clone();
362        let shutdown = self.shutdown.clone();
363
364        tokio::spawn(async move {
365            // Gate on the shutdown flag before doing any work (WR-03). If drain
366            // has begun, do not start this job — the claimed row will be reset
367            // to pending by `requeue_claimed_by`. Checking before acquiring the
368            // permit also avoids contending with the drain's `acquire_many`.
369            if shutdown.load(Ordering::SeqCst) {
370                return;
371            }
372
373            // Acquire the permit inside the task so it is held for the full duration.
374            let _permit = permit.acquire_owned().await.expect("semaphore closed");
375
376            // Re-check after acquiring: drain may have set the flag while we
377            // waited on the permit. If so, bail before executing so the claimed
378            // row is left for `requeue_claimed_by` rather than running a job
379            // whose row is about to be (or has been) requeued.
380            if shutdown.load(Ordering::SeqCst) {
381                return;
382            }
383
384            let job_id = job_row.id;
385            let job_type = job_row.job_type.clone();
386            let tenant_id = job_row.tenant_id;
387            let attempts = job_row.attempts;
388            let max_retries = job_row.max_retries;
389
390            debug!(
391                job_id = %job_id,
392                job_type = %job_type,
393                attempts = attempts,
394                tenant_id = ?tenant_id,
395                worker_id = %worker_id,
396                "Executing job"
397            );
398
399            let handler = match handlers.get(&job_type) {
400                Some(h) => h.clone(),
401                None => {
402                    warn!(job_type = %job_type, "No handler registered — releasing job for retry");
403                    // Release with a short delay — the job will be retried.
404                    let available_at = Utc::now()
405                        + chrono::Duration::from_std(Duration::from_secs(5)).unwrap_or_default();
406                    crate::db::release_job(conn, job_id, attempts + 1, available_at)
407                        .await
408                        .ok();
409                    return;
410                }
411            };
412
413            // Panic-isolated execution (D-11, T-185-03).
414            // AssertUnwindSafe is sound here: the handler closure captures only
415            // Arc references and owned data; we don't observe any interior state
416            // after a panic.
417            let result = AssertUnwindSafe(async move {
418                // Tenant scope wrap (D-17, T-185-08).
419                match (&tenant_scope, tenant_id) {
420                    (Some(scope), Some(id)) => {
421                        let fut = Box::pin(async move {
422                            let (res, _delay) = handler(job_row.payload.clone(), attempts).await;
423                            res
424                        });
425                        (scope.with_scope(id, fut).await, Duration::from_secs(5))
426                    }
427                    _ => handler(job_row.payload.clone(), attempts).await,
428                }
429            })
430            .catch_unwind()
431            .await;
432
433            match result {
434                // Success path: delete the row (D-04 delete-on-success).
435                Ok((Ok(()), _)) => {
436                    debug!(job_id = %job_id, job_type = %job_type, "Job succeeded — deleting row");
437                    crate::db::delete_job(conn, job_id).await.ok();
438                }
439
440                // Handler returned Err — normal failure path.
441                Ok((Err(e), retry_delay)) => {
442                    error!(job_id = %job_id, job_type = %job_type, error = %e, "Job handler returned error");
443                    handle_failure(
444                        conn,
445                        job_id,
446                        attempts,
447                        max_retries,
448                        &e.to_string(),
449                        retry_delay,
450                    )
451                    .await;
452                }
453
454                // Handler panicked — counts as a failed attempt (D-11).
455                Err(_panic) => {
456                    error!(job_id = %job_id, job_type = %job_type, "Job handler panicked — counting as failure");
457                    let msg = "job handler panicked";
458                    // Use a default jitter delay for panics (we can't call retry_delay
459                    // because the handler destructured before the panic).
460                    let delay = default_jitter_delay(attempts);
461                    handle_failure(conn, job_id, attempts, max_retries, msg, delay).await;
462                }
463            }
464        });
465    }
466}
467
468/// Handle a job failure: either park as failed or release for retry.
469async fn handle_failure(
470    conn: &'static DatabaseConnection,
471    job_id: i64,
472    attempts: u32,
473    max_retries: u32,
474    err_msg: &str,
475    retry_delay: Duration,
476) {
477    if attempts + 1 >= max_retries {
478        // Exhausted — park as failed.
479        warn!(job_id = %job_id, attempts = attempts, "Job exhausted retries — parking as failed");
480        crate::db::fail_job(conn, job_id, err_msg).await.ok();
481    } else {
482        // Retry — release with jittered delay.
483        let available_at = Utc::now() + chrono::Duration::from_std(retry_delay).unwrap_or_default();
484        debug!(
485            job_id = %job_id,
486            retry_at = %available_at,
487            "Scheduling job retry"
488        );
489        crate::db::release_job(conn, job_id, attempts + 1, available_at)
490            .await
491            .ok();
492    }
493}
494
495/// Full-jitter exponential backoff for panic cases where `retry_delay` cannot
496/// be called on the job instance.
497///
498/// Formula: `rand(0..=min(cap, base * 2^attempt))`, base 5 s, cap 15 min.
499fn default_jitter_delay(attempt: u32) -> Duration {
500    use rand::Rng;
501    let base_secs: u64 = 5;
502    let cap_secs: u64 = 15 * 60;
503    let max_delay = cap_secs.min(base_secs.saturating_mul(2u64.saturating_pow(attempt)));
504    let jitter = rand::thread_rng().gen_range(0..=max_delay);
505    Duration::from_secs(jitter)
506}
507
508/// RAII guard that aborts a spawned task when dropped.
509///
510/// Used to tie the SIGTERM/Ctrl-C signal task's lifetime to `WorkerLoop::run`
511/// so it never outlives the loop on any exit path (WR-02).
512struct AbortOnDrop(tokio::task::JoinHandle<()>);
513
514impl Drop for AbortOnDrop {
515    fn drop(&mut self) {
516        self.0.abort();
517    }
518}
519
520/// Type alias for API continuity with callers that use the old `Worker` name.
521pub type Worker = WorkerLoop;
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use std::sync::Mutex;
527
528    /// Verifies TenantScopeProvider is object-safe (can be wrapped in Arc<dyn TenantScopeProvider>).
529    #[test]
530    fn test_tenant_scope_provider_is_object_safe() {
531        struct NoopProvider;
532
533        #[async_trait]
534        impl TenantScopeProvider for NoopProvider {
535            async fn with_scope(
536                &self,
537                _tenant_id: i64,
538                f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
539            ) -> Result<(), Error> {
540                f.await
541            }
542        }
543
544        // If this compiles, the trait is object-safe.
545        let _provider: Arc<dyn TenantScopeProvider> = Arc::new(NoopProvider);
546    }
547
548    /// Mock TenantScopeProvider that tracks calls and optionally fails.
549    struct MockScopeProvider {
550        called_with: Arc<Mutex<Vec<i64>>>,
551        should_fail: bool,
552    }
553
554    impl MockScopeProvider {
555        fn new() -> Self {
556            Self {
557                called_with: Arc::new(Mutex::new(Vec::new())),
558                should_fail: false,
559            }
560        }
561
562        fn failing() -> Self {
563            Self {
564                called_with: Arc::new(Mutex::new(Vec::new())),
565                should_fail: true,
566            }
567        }
568    }
569
570    #[async_trait]
571    impl TenantScopeProvider for MockScopeProvider {
572        async fn with_scope(
573            &self,
574            tenant_id: i64,
575            f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
576        ) -> Result<(), Error> {
577            self.called_with.lock().unwrap().push(tenant_id);
578            if self.should_fail {
579                return Err(Error::tenant_not_found(tenant_id));
580            }
581            f.await
582        }
583    }
584
585    /// WorkerLoop can be constructed without a connection argument.
586    #[test]
587    fn test_worker_loop_new() {
588        let w = WorkerLoop::new(WorkerConfig::default());
589        assert!(w.tenant_scope.is_none());
590        assert!(!w.worker_id.is_empty());
591    }
592
593    /// Worker::with_tenant_scope() stores the provider.
594    #[test]
595    fn test_with_tenant_scope_stores_provider() {
596        let w = WorkerLoop::new(WorkerConfig::default());
597        let provider = Arc::new(MockScopeProvider::new());
598        let w = w.with_tenant_scope(provider);
599        assert!(w.tenant_scope.is_some());
600    }
601
602    /// Worker without tenant_scope has None by default.
603    #[test]
604    fn test_worker_without_scope_has_none_by_default() {
605        let w = WorkerLoop::new(WorkerConfig::default());
606        assert!(w.tenant_scope.is_none());
607    }
608
609    /// MockScopeProvider: with_scope calls the job future and records the tenant_id.
610    #[tokio::test]
611    async fn test_mock_scope_provider_calls_future() {
612        let provider = MockScopeProvider::new();
613        let calls = provider.called_with.clone();
614
615        let result = provider.with_scope(42, Box::pin(async { Ok(()) })).await;
616
617        assert!(result.is_ok());
618        assert_eq!(calls.lock().unwrap().as_slice(), &[42]);
619    }
620
621    /// MockScopeProvider: failing variant returns TenantNotFound.
622    #[tokio::test]
623    async fn test_mock_scope_provider_failure_returns_tenant_not_found() {
624        let provider = MockScopeProvider::failing();
625
626        let result = provider.with_scope(99, Box::pin(async { Ok(()) })).await;
627
628        assert!(matches!(
629            result,
630            Err(Error::TenantNotFound { tenant_id: 99 })
631        ));
632    }
633
634    /// scope_dispatch_for_tenant: Some(id) + provider -> with_scope called.
635    #[tokio::test]
636    async fn test_scope_dispatch_tenant_id_some_calls_with_scope() {
637        let mock = MockScopeProvider::new();
638        let calls = mock.called_with.clone();
639        let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
640
641        let tenant_id: Option<i64> = Some(1);
642        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
643
644        let job_ran = Arc::new(Mutex::new(false));
645        let job_ran_clone = job_ran.clone();
646        let job_fut = Box::pin(async move {
647            *job_ran_clone.lock().unwrap() = true;
648            Ok(())
649        });
650
651        let result = match (&tenant_scope, tenant_id) {
652            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
653            _ => job_fut.await,
654        };
655
656        assert!(result.is_ok());
657        assert_eq!(calls.lock().unwrap().as_slice(), &[1i64]);
658        assert!(*job_ran.lock().unwrap(), "job future must have been called");
659    }
660
661    /// scope_dispatch_no_tenant_id: None + provider -> with_scope NOT called.
662    #[tokio::test]
663    async fn test_scope_dispatch_tenant_id_none_skips_with_scope() {
664        let mock = MockScopeProvider::new();
665        let calls = mock.called_with.clone();
666        let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
667
668        let tenant_id: Option<i64> = None;
669        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
670
671        let job_ran = Arc::new(Mutex::new(false));
672        let job_ran_clone = job_ran.clone();
673        let job_fut = Box::pin(async move {
674            *job_ran_clone.lock().unwrap() = true;
675            Ok(())
676        });
677
678        let result = match (&tenant_scope, tenant_id) {
679            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
680            _ => job_fut.await,
681        };
682
683        assert!(result.is_ok());
684        assert!(
685            calls.lock().unwrap().is_empty(),
686            "with_scope must not be called when tenant_id is None"
687        );
688        assert!(
689            *job_ran.lock().unwrap(),
690            "job future must still run directly"
691        );
692    }
693
694    /// scope_dispatch_no_provider: Some(id) + no provider -> job runs directly.
695    #[tokio::test]
696    async fn test_scope_dispatch_no_provider_runs_job_directly() {
697        let tenant_id: Option<i64> = Some(1);
698        let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = None;
699
700        let job_ran = Arc::new(Mutex::new(false));
701        let job_ran_clone = job_ran.clone();
702        let job_fut = Box::pin(async move {
703            *job_ran_clone.lock().unwrap() = true;
704            Ok(())
705        });
706
707        let result = match (&tenant_scope, tenant_id) {
708            (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
709            _ => job_fut.await,
710        };
711
712        assert!(result.is_ok());
713        assert!(
714            *job_ran.lock().unwrap(),
715            "job must run directly without a provider"
716        );
717    }
718
719    /// Shutdown flag is set by WorkerLoop::shutdown().
720    #[test]
721    fn test_shutdown_sets_flag() {
722        let w = WorkerLoop::new(WorkerConfig::default());
723        assert!(!w.shutdown.load(Ordering::SeqCst));
724        w.shutdown();
725        assert!(w.shutdown.load(Ordering::SeqCst));
726    }
727
728    /// WorkerConfig visibility_timeout default is 5 minutes.
729    #[test]
730    fn test_worker_config_visibility_timeout_default() {
731        let c = WorkerConfig::default();
732        assert_eq!(c.visibility_timeout, Duration::from_secs(300));
733    }
734
735    /// default_jitter_delay stays within bounds.
736    #[test]
737    fn test_default_jitter_delay_bounds() {
738        for _ in 0..50 {
739            assert!(default_jitter_delay(0).as_secs() <= 5);
740            assert!(default_jitter_delay(3).as_secs() <= 40);
741            assert!(default_jitter_delay(30).as_secs() <= 900);
742        }
743    }
744}