ferro_queue/worker.rs
1//! DB-backed queue worker (WorkerLoop).
2//!
3//! `WorkerLoop` runs a reaper→claim→spawn cycle. Each iteration:
4//! 1. Runs `db::reaper` to reset stuck claimed rows.
5//! 2. Calls `db::claim` to atomically take the next pending job.
6//! 3. Spawns a task that executes the handler with panic isolation.
7//!
8//! Shutdown is triggered by SIGTERM, Ctrl-C, or a call to `WorkerLoop::shutdown()`.
9//! On shutdown the loop drains in-flight jobs and calls `db::requeue_claimed_by`
10//! to reset any claimed-but-unstarted rows back to `pending` (D-10).
11
12use crate::{Error, Job};
13use async_trait::async_trait;
14use chrono::Utc;
15use futures::FutureExt;
16use sea_orm::DatabaseConnection;
17use std::collections::HashMap;
18use std::future::Future;
19use std::panic::AssertUnwindSafe;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::Semaphore;
25use tracing::{debug, error, info, warn};
26
27// ---------------------------------------------------------------------------
28// TenantScopeProvider — preserved verbatim from previous worker.rs
29// ---------------------------------------------------------------------------
30
31/// Injects tenant scope around job execution.
32///
33/// Implemented by the framework — injected at startup via `WorkerLoop::with_tenant_scope()`.
34/// The provider receives a tenant_id and a boxed future representing the job execution.
35/// It must resolve the tenant, establish a task-local scope, and run the future within it.
36/// Returns `TenantNotFound` error if the tenant ID does not resolve to a valid tenant.
37#[async_trait]
38pub trait TenantScopeProvider: Send + Sync {
39 /// Run the given future within a tenant scope for the specified tenant.
40 async fn with_scope(
41 &self,
42 tenant_id: i64,
43 f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
44 ) -> Result<(), Error>;
45}
46
47// ---------------------------------------------------------------------------
48// WorkerConfig
49// ---------------------------------------------------------------------------
50
51/// Worker configuration.
52#[derive(Debug, Clone)]
53pub struct WorkerConfig {
54 /// Queues to process (in priority order).
55 pub queues: Vec<String>,
56 /// Maximum concurrent jobs.
57 pub max_jobs: usize,
58 /// Sleep duration when no jobs are available.
59 pub sleep_duration: Duration,
60 /// Whether to stop on error.
61 pub stop_on_error: bool,
62 /// How long a claimed job stays invisible before the reaper reclaims it.
63 pub visibility_timeout: Duration,
64}
65
66impl Default for WorkerConfig {
67 fn default() -> Self {
68 Self {
69 queues: vec!["default".to_string()],
70 max_jobs: 10,
71 sleep_duration: Duration::from_secs(1),
72 stop_on_error: false,
73 visibility_timeout: Duration::from_secs(300),
74 }
75 }
76}
77
78impl WorkerConfig {
79 /// Create a new worker config for specific queues.
80 pub fn new(queues: Vec<String>) -> Self {
81 Self {
82 queues,
83 ..Default::default()
84 }
85 }
86
87 /// Set max concurrent jobs.
88 pub fn max_jobs(mut self, max: usize) -> Self {
89 self.max_jobs = max;
90 self
91 }
92
93 /// Set the visibility timeout.
94 pub fn with_visibility_timeout(mut self, d: Duration) -> Self {
95 self.visibility_timeout = d;
96 self
97 }
98}
99
100// ---------------------------------------------------------------------------
101// JobHandler type alias
102// ---------------------------------------------------------------------------
103
104/// Handler closure stored per job type name.
105///
106/// Returns `(Result<(), Error>, retry_delay_secs)` — the delay is the
107/// per-job `retry_delay(attempt)` value captured at registration time.
108type JobHandler = Arc<
109 dyn Fn(String, u32) -> Pin<Box<dyn Future<Output = (Result<(), Error>, Duration)> + Send>>
110 + Send
111 + Sync,
112>;
113
114// ---------------------------------------------------------------------------
115// WorkerLoop
116// ---------------------------------------------------------------------------
117
118/// DB-backed queue worker.
119///
120/// Runs a reaper→claim→spawn cycle until signalled to stop. Panic isolation
121/// (D-11) ensures a panicking job handler never kills the loop. Graceful
122/// shutdown (D-10) drains in-flight jobs and resets claimed-but-unstarted rows.
123pub struct WorkerLoop {
124 /// Worker configuration.
125 config: WorkerConfig,
126 /// Registered job handlers by type name.
127 handlers: HashMap<String, JobHandler>,
128 /// Semaphore limiting concurrent in-flight jobs.
129 semaphore: Arc<Semaphore>,
130 /// Shutdown flag — set by SIGTERM/Ctrl-C or by `WorkerLoop::shutdown()`.
131 shutdown: Arc<AtomicBool>,
132 /// Optional tenant scope provider.
133 tenant_scope: Option<Arc<dyn TenantScopeProvider>>,
134 /// Unique identifier for this worker instance (used by requeue_claimed_by).
135 worker_id: String,
136}
137
138impl WorkerLoop {
139 /// Create a new worker loop.
140 ///
141 /// A random UUID is generated as the worker ID. The worker reads the DB
142 /// connection from `Queue::connection()` — no connection argument needed.
143 pub fn new(config: WorkerConfig) -> Self {
144 let semaphore = Arc::new(Semaphore::new(config.max_jobs));
145 Self {
146 config,
147 handlers: HashMap::new(),
148 semaphore,
149 shutdown: Arc::new(AtomicBool::new(false)),
150 tenant_scope: None,
151 worker_id: uuid::Uuid::new_v4().to_string(),
152 }
153 }
154
155 /// Inject a tenant scope provider for tenant-aware job execution.
156 ///
157 /// When set, jobs with a `tenant_id` are executed inside a tenant context
158 /// scope. Jobs without a tenant_id or workers without a provider run in
159 /// the default (system) scope.
160 pub fn with_tenant_scope(mut self, provider: Arc<dyn TenantScopeProvider>) -> Self {
161 self.tenant_scope = Some(provider);
162 self
163 }
164
165 /// Register a job handler.
166 ///
167 /// The handler closure deserializes the job from JSON, calls `handle()`,
168 /// and also captures the per-job `retry_delay(attempt)` for use on failure.
169 ///
170 /// # Example
171 ///
172 /// ```rust,ignore
173 /// worker_loop.register::<SendEmailJob>();
174 /// ```
175 pub fn register<J>(&mut self)
176 where
177 J: Job + serde::de::DeserializeOwned + 'static,
178 {
179 let type_name = std::any::type_name::<J>().to_string();
180
181 let handler: JobHandler = Arc::new(move |data: String, attempt: u32| {
182 Box::pin(async move {
183 let job: J = match serde_json::from_str::<J>(&data) {
184 Ok(j) => j,
185 Err(e) => {
186 return (
187 Err(Error::DeserializationFailed(e.to_string())),
188 Duration::from_secs(5),
189 )
190 }
191 };
192 // Capture retry_delay before consuming the job.
193 let delay = job.retry_delay(attempt);
194 let result = job.handle().await;
195 (result, delay)
196 })
197 });
198
199 self.handlers.insert(type_name, handler);
200 }
201
202 /// Build a `WorkerLoop` and apply all job types registered via `Queue::register`.
203 ///
204 /// This is the entry point used by the framework's server boot path to create
205 /// the auto-started worker when at least one job type has been registered.
206 pub fn from_registry(config: WorkerConfig) -> Self {
207 let mut w = Self::new(config);
208 crate::db::Queue::apply_registrars(&mut w);
209 w
210 }
211
212 /// Signal the worker loop to shut down gracefully.
213 ///
214 /// Sets the same AtomicBool that the SIGTERM/Ctrl-C handler sets, so
215 /// programmatic and signal-based shutdown share one path.
216 pub fn shutdown(&self) {
217 self.shutdown.store(true, Ordering::SeqCst);
218 }
219
220 /// Run the worker loop until shutdown.
221 ///
222 /// Loop behaviour:
223 /// 1. Check shutdown flag — if set, drain in-flight jobs and requeue.
224 /// 2. For each queue: run reaper, then attempt claim.
225 /// - If a job is claimed, spawn it and loop immediately (no idle sleep).
226 /// 3. If no jobs were found across all queues, sleep `config.sleep_duration`.
227 pub async fn run(&self) -> Result<(), Error> {
228 let conn: &'static DatabaseConnection = crate::db::Queue::connection();
229
230 info!(
231 worker_id = %self.worker_id,
232 queues = ?self.config.queues,
233 max_jobs = self.config.max_jobs,
234 "WorkerLoop starting"
235 );
236
237 // Install SIGTERM / Ctrl-C handler — sets the same shutdown flag.
238 // Hold the JoinHandle so it is aborted when `run()` returns (WR-02);
239 // otherwise a clean shutdown or a `stop_on_error` exit would leak a
240 // detached task holding a `shutdown` Arc clone, and a second `run()`
241 // would install a duplicate handler.
242 let signal_task = {
243 let shutdown = self.shutdown.clone();
244 tokio::spawn(async move {
245 // Registration failure must not panic inside a detached task
246 // (unobservable to the caller). Log and request shutdown instead.
247 let mut sigterm = match tokio::signal::unix::signal(
248 tokio::signal::unix::SignalKind::terminate(),
249 ) {
250 Ok(s) => s,
251 Err(e) => {
252 error!(error = %e, "failed to install SIGTERM handler — requesting shutdown");
253 shutdown.store(true, Ordering::SeqCst);
254 return;
255 }
256 };
257 tokio::select! {
258 _ = sigterm.recv() => {
259 info!("SIGTERM received — shutting down WorkerLoop");
260 }
261 _ = tokio::signal::ctrl_c() => {
262 info!("Ctrl-C received — shutting down WorkerLoop");
263 }
264 }
265 shutdown.store(true, Ordering::SeqCst);
266 })
267 };
268 // Abort the signal task on any exit path from this function.
269 let _signal_guard = AbortOnDrop(signal_task);
270
271 // Reap orphan `claimed` rows left behind by a previous worker that died
272 // mid-job (SIGKILL on deploy restart, OOM). Without this they linger
273 // until the visibility-timeout reaper catches them — up to
274 // `config.visibility_timeout` (default 300s) of stale "in progress" UI.
275 // Runs once before the claim loop starts. Scoped to this worker's
276 // queues so concurrent workers on disjoint queues don't clobber each
277 // other; the model assumed is single-worker-per-queue.
278 match crate::db::reap_startup_claims(conn, &self.config.queues).await {
279 Ok(0) => {}
280 Ok(n) => {
281 info!(worker_id = %self.worker_id, reaped = n, "reaped orphan claimed jobs at startup")
282 }
283 Err(e) => {
284 error!(worker_id = %self.worker_id, error = %e, "startup orphan reap failed — continuing")
285 }
286 }
287
288 'outer: loop {
289 // --- Shutdown gate ---
290 if self.shutdown.load(Ordering::SeqCst) {
291 info!(worker_id = %self.worker_id, "Shutdown flag set — draining in-flight jobs");
292
293 // Drain: acquire ALL permits and HOLD them across the requeue
294 // (WR-03). Binding to a named guard keeps the permits held until
295 // the end of this scope; `let _ =` would release them
296 // immediately, letting a still-pending spawn_job task grab a
297 // permit and start a new job after the drain — which
298 // requeue_claimed_by could then yank out from under it.
299 let _drain_guard = self
300 .semaphore
301 .acquire_many(self.config.max_jobs as u32)
302 .await;
303
304 // Requeue any claimed-but-unstarted rows belonging to this worker.
305 crate::db::requeue_claimed_by(conn, &self.worker_id)
306 .await
307 .map_err(|e| {
308 error!(error = %e, "requeue_claimed_by failed during shutdown");
309 e
310 })?;
311
312 info!(worker_id = %self.worker_id, "WorkerLoop shut down cleanly");
313 // _drain_guard dropped here, after requeue completes.
314 return Ok(());
315 }
316
317 // --- Reaper + claim cycle ---
318 for queue in &self.config.queues {
319 // Run reaper before each claim attempt (D-14).
320 match crate::db::reaper(conn, queue, self.config.visibility_timeout).await {
321 Ok(()) => {}
322 Err(e) => {
323 error!(queue = %queue, error = %e, "reaper error");
324 if self.config.stop_on_error {
325 return Err(e);
326 }
327 }
328 }
329
330 // Attempt an atomic claim.
331 match crate::db::claim(conn, queue, &self.worker_id).await {
332 Ok(Some(job_row)) => {
333 self.spawn_job(conn, job_row);
334 continue 'outer; // claimed something — loop immediately without sleep
335 }
336 Ok(None) => {} // nothing in this queue
337 Err(e) => {
338 error!(queue = %queue, error = %e, "claim error");
339 if self.config.stop_on_error {
340 return Err(e);
341 }
342 }
343 }
344 }
345
346 // No jobs found across all queues — idle sleep (D-08).
347 tokio::time::sleep(self.config.sleep_duration).await;
348 }
349 }
350
351 /// Spawn a task that executes `job_row` with panic isolation.
352 ///
353 /// Acquires a semaphore permit before spawning. The permit is held for the
354 /// lifetime of the task, enforcing `config.max_jobs` concurrency.
355 fn spawn_job(&self, conn: &'static DatabaseConnection, job_row: crate::db::JobRow) {
356 // Clone the semaphore Arc; the spawned task acquires an owned permit inside,
357 // keeping it held for the full duration of the job.
358 let permit = self.semaphore.clone();
359 let handlers = self.handlers.clone();
360 let tenant_scope = self.tenant_scope.clone();
361 let worker_id = self.worker_id.clone();
362 let shutdown = self.shutdown.clone();
363
364 tokio::spawn(async move {
365 // Gate on the shutdown flag before doing any work (WR-03). If drain
366 // has begun, do not start this job — the claimed row will be reset
367 // to pending by `requeue_claimed_by`. Checking before acquiring the
368 // permit also avoids contending with the drain's `acquire_many`.
369 if shutdown.load(Ordering::SeqCst) {
370 return;
371 }
372
373 // Acquire the permit inside the task so it is held for the full duration.
374 let _permit = permit.acquire_owned().await.expect("semaphore closed");
375
376 // Re-check after acquiring: drain may have set the flag while we
377 // waited on the permit. If so, bail before executing so the claimed
378 // row is left for `requeue_claimed_by` rather than running a job
379 // whose row is about to be (or has been) requeued.
380 if shutdown.load(Ordering::SeqCst) {
381 return;
382 }
383
384 let job_id = job_row.id;
385 let job_type = job_row.job_type.clone();
386 let tenant_id = job_row.tenant_id;
387 let attempts = job_row.attempts;
388 let max_retries = job_row.max_retries;
389
390 debug!(
391 job_id = %job_id,
392 job_type = %job_type,
393 attempts = attempts,
394 tenant_id = ?tenant_id,
395 worker_id = %worker_id,
396 "Executing job"
397 );
398
399 let handler = match handlers.get(&job_type) {
400 Some(h) => h.clone(),
401 None => {
402 warn!(job_type = %job_type, "No handler registered — releasing job for retry");
403 // Release with a short delay — the job will be retried.
404 let available_at = Utc::now()
405 + chrono::Duration::from_std(Duration::from_secs(5)).unwrap_or_default();
406 crate::db::release_job(conn, job_id, attempts + 1, available_at)
407 .await
408 .ok();
409 return;
410 }
411 };
412
413 // Panic-isolated execution (D-11, T-185-03).
414 // AssertUnwindSafe is sound here: the handler closure captures only
415 // Arc references and owned data; we don't observe any interior state
416 // after a panic.
417 let result = AssertUnwindSafe(async move {
418 // Tenant scope wrap (D-17, T-185-08).
419 match (&tenant_scope, tenant_id) {
420 (Some(scope), Some(id)) => {
421 let fut = Box::pin(async move {
422 let (res, _delay) = handler(job_row.payload.clone(), attempts).await;
423 res
424 });
425 (scope.with_scope(id, fut).await, Duration::from_secs(5))
426 }
427 _ => handler(job_row.payload.clone(), attempts).await,
428 }
429 })
430 .catch_unwind()
431 .await;
432
433 match result {
434 // Success path: delete the row (D-04 delete-on-success).
435 Ok((Ok(()), _)) => {
436 debug!(job_id = %job_id, job_type = %job_type, "Job succeeded — deleting row");
437 crate::db::delete_job(conn, job_id).await.ok();
438 }
439
440 // Handler returned Err — normal failure path.
441 Ok((Err(e), retry_delay)) => {
442 error!(job_id = %job_id, job_type = %job_type, error = %e, "Job handler returned error");
443 handle_failure(
444 conn,
445 job_id,
446 attempts,
447 max_retries,
448 &e.to_string(),
449 retry_delay,
450 )
451 .await;
452 }
453
454 // Handler panicked — counts as a failed attempt (D-11).
455 Err(_panic) => {
456 error!(job_id = %job_id, job_type = %job_type, "Job handler panicked — counting as failure");
457 let msg = "job handler panicked";
458 // Use a default jitter delay for panics (we can't call retry_delay
459 // because the handler destructured before the panic).
460 let delay = default_jitter_delay(attempts);
461 handle_failure(conn, job_id, attempts, max_retries, msg, delay).await;
462 }
463 }
464 });
465 }
466}
467
468/// Handle a job failure: either park as failed or release for retry.
469async fn handle_failure(
470 conn: &'static DatabaseConnection,
471 job_id: i64,
472 attempts: u32,
473 max_retries: u32,
474 err_msg: &str,
475 retry_delay: Duration,
476) {
477 if attempts + 1 >= max_retries {
478 // Exhausted — park as failed.
479 warn!(job_id = %job_id, attempts = attempts, "Job exhausted retries — parking as failed");
480 crate::db::fail_job(conn, job_id, err_msg).await.ok();
481 } else {
482 // Retry — release with jittered delay.
483 let available_at = Utc::now() + chrono::Duration::from_std(retry_delay).unwrap_or_default();
484 debug!(
485 job_id = %job_id,
486 retry_at = %available_at,
487 "Scheduling job retry"
488 );
489 crate::db::release_job(conn, job_id, attempts + 1, available_at)
490 .await
491 .ok();
492 }
493}
494
495/// Full-jitter exponential backoff for panic cases where `retry_delay` cannot
496/// be called on the job instance.
497///
498/// Formula: `rand(0..=min(cap, base * 2^attempt))`, base 5 s, cap 15 min.
499fn default_jitter_delay(attempt: u32) -> Duration {
500 use rand::Rng;
501 let base_secs: u64 = 5;
502 let cap_secs: u64 = 15 * 60;
503 let max_delay = cap_secs.min(base_secs.saturating_mul(2u64.saturating_pow(attempt)));
504 let jitter = rand::thread_rng().gen_range(0..=max_delay);
505 Duration::from_secs(jitter)
506}
507
508/// RAII guard that aborts a spawned task when dropped.
509///
510/// Used to tie the SIGTERM/Ctrl-C signal task's lifetime to `WorkerLoop::run`
511/// so it never outlives the loop on any exit path (WR-02).
512struct AbortOnDrop(tokio::task::JoinHandle<()>);
513
514impl Drop for AbortOnDrop {
515 fn drop(&mut self) {
516 self.0.abort();
517 }
518}
519
520/// Type alias for API continuity with callers that use the old `Worker` name.
521pub type Worker = WorkerLoop;
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use std::sync::Mutex;
527
528 /// Verifies TenantScopeProvider is object-safe (can be wrapped in Arc<dyn TenantScopeProvider>).
529 #[test]
530 fn test_tenant_scope_provider_is_object_safe() {
531 struct NoopProvider;
532
533 #[async_trait]
534 impl TenantScopeProvider for NoopProvider {
535 async fn with_scope(
536 &self,
537 _tenant_id: i64,
538 f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
539 ) -> Result<(), Error> {
540 f.await
541 }
542 }
543
544 // If this compiles, the trait is object-safe.
545 let _provider: Arc<dyn TenantScopeProvider> = Arc::new(NoopProvider);
546 }
547
548 /// Mock TenantScopeProvider that tracks calls and optionally fails.
549 struct MockScopeProvider {
550 called_with: Arc<Mutex<Vec<i64>>>,
551 should_fail: bool,
552 }
553
554 impl MockScopeProvider {
555 fn new() -> Self {
556 Self {
557 called_with: Arc::new(Mutex::new(Vec::new())),
558 should_fail: false,
559 }
560 }
561
562 fn failing() -> Self {
563 Self {
564 called_with: Arc::new(Mutex::new(Vec::new())),
565 should_fail: true,
566 }
567 }
568 }
569
570 #[async_trait]
571 impl TenantScopeProvider for MockScopeProvider {
572 async fn with_scope(
573 &self,
574 tenant_id: i64,
575 f: Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
576 ) -> Result<(), Error> {
577 self.called_with.lock().unwrap().push(tenant_id);
578 if self.should_fail {
579 return Err(Error::tenant_not_found(tenant_id));
580 }
581 f.await
582 }
583 }
584
585 /// WorkerLoop can be constructed without a connection argument.
586 #[test]
587 fn test_worker_loop_new() {
588 let w = WorkerLoop::new(WorkerConfig::default());
589 assert!(w.tenant_scope.is_none());
590 assert!(!w.worker_id.is_empty());
591 }
592
593 /// Worker::with_tenant_scope() stores the provider.
594 #[test]
595 fn test_with_tenant_scope_stores_provider() {
596 let w = WorkerLoop::new(WorkerConfig::default());
597 let provider = Arc::new(MockScopeProvider::new());
598 let w = w.with_tenant_scope(provider);
599 assert!(w.tenant_scope.is_some());
600 }
601
602 /// Worker without tenant_scope has None by default.
603 #[test]
604 fn test_worker_without_scope_has_none_by_default() {
605 let w = WorkerLoop::new(WorkerConfig::default());
606 assert!(w.tenant_scope.is_none());
607 }
608
609 /// MockScopeProvider: with_scope calls the job future and records the tenant_id.
610 #[tokio::test]
611 async fn test_mock_scope_provider_calls_future() {
612 let provider = MockScopeProvider::new();
613 let calls = provider.called_with.clone();
614
615 let result = provider.with_scope(42, Box::pin(async { Ok(()) })).await;
616
617 assert!(result.is_ok());
618 assert_eq!(calls.lock().unwrap().as_slice(), &[42]);
619 }
620
621 /// MockScopeProvider: failing variant returns TenantNotFound.
622 #[tokio::test]
623 async fn test_mock_scope_provider_failure_returns_tenant_not_found() {
624 let provider = MockScopeProvider::failing();
625
626 let result = provider.with_scope(99, Box::pin(async { Ok(()) })).await;
627
628 assert!(matches!(
629 result,
630 Err(Error::TenantNotFound { tenant_id: 99 })
631 ));
632 }
633
634 /// scope_dispatch_for_tenant: Some(id) + provider -> with_scope called.
635 #[tokio::test]
636 async fn test_scope_dispatch_tenant_id_some_calls_with_scope() {
637 let mock = MockScopeProvider::new();
638 let calls = mock.called_with.clone();
639 let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
640
641 let tenant_id: Option<i64> = Some(1);
642 let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
643
644 let job_ran = Arc::new(Mutex::new(false));
645 let job_ran_clone = job_ran.clone();
646 let job_fut = Box::pin(async move {
647 *job_ran_clone.lock().unwrap() = true;
648 Ok(())
649 });
650
651 let result = match (&tenant_scope, tenant_id) {
652 (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
653 _ => job_fut.await,
654 };
655
656 assert!(result.is_ok());
657 assert_eq!(calls.lock().unwrap().as_slice(), &[1i64]);
658 assert!(*job_ran.lock().unwrap(), "job future must have been called");
659 }
660
661 /// scope_dispatch_no_tenant_id: None + provider -> with_scope NOT called.
662 #[tokio::test]
663 async fn test_scope_dispatch_tenant_id_none_skips_with_scope() {
664 let mock = MockScopeProvider::new();
665 let calls = mock.called_with.clone();
666 let provider: Arc<dyn TenantScopeProvider> = Arc::new(mock);
667
668 let tenant_id: Option<i64> = None;
669 let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = Some(provider);
670
671 let job_ran = Arc::new(Mutex::new(false));
672 let job_ran_clone = job_ran.clone();
673 let job_fut = Box::pin(async move {
674 *job_ran_clone.lock().unwrap() = true;
675 Ok(())
676 });
677
678 let result = match (&tenant_scope, tenant_id) {
679 (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
680 _ => job_fut.await,
681 };
682
683 assert!(result.is_ok());
684 assert!(
685 calls.lock().unwrap().is_empty(),
686 "with_scope must not be called when tenant_id is None"
687 );
688 assert!(
689 *job_ran.lock().unwrap(),
690 "job future must still run directly"
691 );
692 }
693
694 /// scope_dispatch_no_provider: Some(id) + no provider -> job runs directly.
695 #[tokio::test]
696 async fn test_scope_dispatch_no_provider_runs_job_directly() {
697 let tenant_id: Option<i64> = Some(1);
698 let tenant_scope: Option<Arc<dyn TenantScopeProvider>> = None;
699
700 let job_ran = Arc::new(Mutex::new(false));
701 let job_ran_clone = job_ran.clone();
702 let job_fut = Box::pin(async move {
703 *job_ran_clone.lock().unwrap() = true;
704 Ok(())
705 });
706
707 let result = match (&tenant_scope, tenant_id) {
708 (Some(scope), Some(id)) => scope.with_scope(id, job_fut).await,
709 _ => job_fut.await,
710 };
711
712 assert!(result.is_ok());
713 assert!(
714 *job_ran.lock().unwrap(),
715 "job must run directly without a provider"
716 );
717 }
718
719 /// Shutdown flag is set by WorkerLoop::shutdown().
720 #[test]
721 fn test_shutdown_sets_flag() {
722 let w = WorkerLoop::new(WorkerConfig::default());
723 assert!(!w.shutdown.load(Ordering::SeqCst));
724 w.shutdown();
725 assert!(w.shutdown.load(Ordering::SeqCst));
726 }
727
728 /// WorkerConfig visibility_timeout default is 5 minutes.
729 #[test]
730 fn test_worker_config_visibility_timeout_default() {
731 let c = WorkerConfig::default();
732 assert_eq!(c.visibility_timeout, Duration::from_secs(300));
733 }
734
735 /// default_jitter_delay stays within bounds.
736 #[test]
737 fn test_default_jitter_delay_bounds() {
738 for _ in 0..50 {
739 assert!(default_jitter_delay(0).as_secs() <= 5);
740 assert!(default_jitter_delay(3).as_secs() <= 40);
741 assert!(default_jitter_delay(30).as_secs() <= 900);
742 }
743 }
744}