Skip to main content

qml_rs/processing/
server.rs

1//! Background job server for managing job processing
2//!
3//! This module contains the BackgroundJobServer that coordinates job processing,
4//! manages worker threads, and handles the overall job processing lifecycle.
5
6use chrono::Duration;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::task::JoinHandle;
10use tokio::time::{MissedTickBehavior, interval, sleep};
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info, warn};
13
14use super::{
15    RetryPolicy, WorkerRegistry,
16    cleanup::{CleanupWorker, DEFAULT_CLEANUP_INTERVAL, DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL},
17    heartbeat::{DEFAULT_DEAD_SERVER_TIMEOUT, DEFAULT_HEARTBEAT_INTERVAL, HeartbeatWorker},
18    middleware::{JobMiddleware, TracingMiddleware},
19    processor::{JobProcessor, StateChangeHook},
20    recurring::RecurringJobPoller,
21    scheduler::JobScheduler,
22    worker::WorkerConfig,
23};
24use crate::core::{RecurringJob, ServerInfo};
25use crate::error::{QmlError, Result};
26use crate::storage::Storage;
27use crate::storage::prelude::*;
28
29/// Configuration for the background job server
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ServerConfig {
32    /// Server name identifier
33    pub server_name: String,
34    /// Number of worker threads to run
35    pub worker_count: usize,
36    /// Polling interval for checking new jobs
37    pub polling_interval: Duration,
38    /// Timeout for job execution
39    pub job_timeout: Duration,
40    /// Queues to process (empty means all queues)
41    pub queues: Vec<String>,
42    /// Whether the server should start automatically
43    pub auto_start: bool,
44    /// Maximum number of jobs to fetch per polling cycle
45    pub fetch_batch_size: usize,
46    /// Enable the job scheduler
47    pub enable_scheduler: bool,
48    /// Scheduler polling interval
49    pub scheduler_poll_interval: Duration,
50    /// Grace period given to in-flight workers after `stop()` cancels the
51    /// shutdown token. Workers that haven't completed their current job by
52    /// then are aborted and the jobs will need lock-expiry / stale-processing
53    /// recovery to be picked up again.
54    pub shutdown_timeout: Duration,
55    /// A `Processing` job is treated as stranded (and re-queued on startup)
56    /// once its `started_at` is older than this threshold. Default: 5 minutes.
57    ///
58    /// This should comfortably exceed the typical `job_timeout` so a worker
59    /// that's still alive isn't fighting with the recovery sweep.
60    pub stale_processing_after: Duration,
61    /// Enable the recurring-job poller that materializes due
62    /// [`RecurringJob`](crate::core::RecurringJob) templates.
63    pub enable_recurring: bool,
64    /// Poll interval for the recurring-job poller. Defaults to 5 seconds so
65    /// minute-granularity crons fire promptly without hammering storage.
66    pub recurring_poll_interval: Duration,
67    /// Enable the background cleanup worker that deletes rows whose
68    /// `expires_at` is in the past.
69    pub enable_cleanup: bool,
70    /// Interval between cleanup sweeps. Defaults to 1 minute.
71    pub cleanup_interval: Duration,
72    /// TTL stamped onto successfully-completed jobs. Defaults to 24 hours.
73    pub succeeded_ttl: Duration,
74    /// TTL stamped onto permanently-failed jobs. Defaults to 7 days.
75    pub failed_ttl: Duration,
76    /// Enable the server heartbeat + dead-peer reclaim worker (D1).
77    ///
78    /// When enabled, this server registers itself in the storage-level
79    /// server registry, bumps its `last_heartbeat` on every
80    /// `heartbeat_interval`, and periodically scans for peers whose
81    /// heartbeat has gone stale. Dead peers' in-flight `Processing` jobs
82    /// are actively reclaimed back to `Enqueued`.
83    ///
84    /// Disabled by default so single-server deployments don't pay for an
85    /// unused registry.
86    pub enable_heartbeat: bool,
87    /// How often to bump this server's `last_heartbeat` row. Defaults to
88    /// 10 seconds.
89    pub heartbeat_interval: Duration,
90    /// A peer is treated as dead once its `last_heartbeat` is older than
91    /// this threshold. Defaults to 60 seconds — should comfortably exceed
92    /// `heartbeat_interval` so brief slowdowns don't trigger reclaim.
93    pub dead_server_timeout: Duration,
94}
95
96impl Default for ServerConfig {
97    fn default() -> Self {
98        Self {
99            server_name: "qml-server".to_string(),
100            worker_count: 5,
101            polling_interval: Duration::seconds(1),
102            job_timeout: Duration::minutes(5),
103            queues: vec!["default".to_string()],
104            auto_start: true,
105            fetch_batch_size: 10,
106            enable_scheduler: true,
107            scheduler_poll_interval: Duration::seconds(30),
108            shutdown_timeout: Duration::seconds(30),
109            stale_processing_after: Duration::minutes(5),
110            enable_recurring: true,
111            recurring_poll_interval: Duration::seconds(5),
112            enable_cleanup: true,
113            cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
114            succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
115            failed_ttl: DEFAULT_FAILED_TTL,
116            enable_heartbeat: false,
117            heartbeat_interval: DEFAULT_HEARTBEAT_INTERVAL,
118            dead_server_timeout: DEFAULT_DEAD_SERVER_TIMEOUT,
119        }
120    }
121}
122
123impl ServerConfig {
124    /// Create a new server configuration
125    pub fn new(server_name: impl Into<String>) -> Self {
126        Self {
127            server_name: server_name.into(),
128            ..Default::default()
129        }
130    }
131
132    /// Set the number of workers
133    pub fn worker_count(mut self, count: usize) -> Self {
134        self.worker_count = count;
135        self
136    }
137
138    /// Set the polling interval
139    pub fn polling_interval(mut self, interval: Duration) -> Self {
140        self.polling_interval = interval;
141        self
142    }
143
144    /// Set the job timeout
145    pub fn job_timeout(mut self, timeout: Duration) -> Self {
146        self.job_timeout = timeout;
147        self
148    }
149
150    /// Set the queues to process
151    pub fn queues(mut self, queues: Vec<String>) -> Self {
152        self.queues = queues;
153        self
154    }
155
156    /// Set the fetch batch size
157    pub fn fetch_batch_size(mut self, size: usize) -> Self {
158        self.fetch_batch_size = size;
159        self
160    }
161
162    /// Enable or disable the scheduler
163    pub fn enable_scheduler(mut self, enable: bool) -> Self {
164        self.enable_scheduler = enable;
165        self
166    }
167
168    /// Set how long `stop()` waits for in-flight workers before aborting.
169    pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
170        self.shutdown_timeout = timeout;
171        self
172    }
173
174    /// Set the staleness threshold for re-queuing stranded `Processing` jobs
175    /// on startup.
176    pub fn stale_processing_after(mut self, threshold: Duration) -> Self {
177        self.stale_processing_after = threshold;
178        self
179    }
180
181    /// Enable or disable the recurring-job poller.
182    pub fn enable_recurring(mut self, enable: bool) -> Self {
183        self.enable_recurring = enable;
184        self
185    }
186
187    /// Set the recurring-job poll interval.
188    pub fn recurring_poll_interval(mut self, interval: Duration) -> Self {
189        self.recurring_poll_interval = interval;
190        self
191    }
192
193    /// Enable or disable the background cleanup worker.
194    pub fn enable_cleanup(mut self, enable: bool) -> Self {
195        self.enable_cleanup = enable;
196        self
197    }
198
199    /// Set the cleanup-worker sweep interval.
200    pub fn cleanup_interval(mut self, interval: Duration) -> Self {
201        self.cleanup_interval = interval;
202        self
203    }
204
205    /// Set the TTL stamped onto successfully-completed jobs.
206    pub fn succeeded_ttl(mut self, ttl: Duration) -> Self {
207        self.succeeded_ttl = ttl;
208        self
209    }
210
211    /// Set the TTL stamped onto permanently-failed jobs.
212    pub fn failed_ttl(mut self, ttl: Duration) -> Self {
213        self.failed_ttl = ttl;
214        self
215    }
216
217    /// Enable or disable the heartbeat + dead-peer reclaim worker.
218    pub fn enable_heartbeat(mut self, enable: bool) -> Self {
219        self.enable_heartbeat = enable;
220        self
221    }
222
223    /// Set the heartbeat bump interval.
224    pub fn heartbeat_interval(mut self, interval: Duration) -> Self {
225        self.heartbeat_interval = interval;
226        self
227    }
228
229    /// Set the peer-dead staleness threshold.
230    pub fn dead_server_timeout(mut self, timeout: Duration) -> Self {
231        self.dead_server_timeout = timeout;
232        self
233    }
234}
235
236/// Background job server that manages job processing
237pub struct BackgroundJobServer {
238    config: ServerConfig,
239    storage: Arc<dyn Storage>,
240    worker_registry: Arc<WorkerRegistry>,
241    retry_policy: RetryPolicy,
242    /// Middleware stack layered around every `worker.execute(&job, &ctx)`
243    /// call. Runs in registration order; the built-in
244    /// [`TracingMiddleware`] is installed by default so every execution
245    /// ships with a structured span. Replace via
246    /// [`BackgroundJobServer::with_middleware`] — the new stack replaces
247    /// the built-in entirely, so re-add `TracingMiddleware` if you still
248    /// want spans.
249    middleware: Vec<Arc<dyn JobMiddleware>>,
250    /// Optional observer fired after every persisted state transition.
251    /// Cloned into every per-worker [`JobProcessor`] on `start()`. Lives
252    /// on the server (not [`ServerConfig`]) because `Arc<dyn Fn…>` can't
253    /// participate in `Serialize`/`Deserialize` — same reasoning as the
254    /// middleware field.
255    on_state_change: Option<StateChangeHook>,
256    is_running: Arc<tokio::sync::RwLock<bool>>,
257    /// Parent cancellation token for the running instance. Cancelling it
258    /// tells every worker loop (and the scheduler loop) to drain cleanly.
259    /// Each `start()` installs a fresh token so a subsequent restart starts
260    /// from an uncancelled state.
261    shutdown_token: Arc<tokio::sync::Mutex<CancellationToken>>,
262    worker_handles: Arc<tokio::sync::Mutex<Vec<JoinHandle<()>>>>,
263    /// Unique id this server registered under when heartbeats are enabled.
264    /// Populated on `start()` (`{server_name}#{uuid}`) and consumed by
265    /// `stop()` to deregister the row. `None` when heartbeats are off or
266    /// between start cycles.
267    server_id: Arc<tokio::sync::Mutex<Option<String>>>,
268}
269
270impl BackgroundJobServer {
271    /// Create a new background job server
272    pub fn new(
273        config: ServerConfig,
274        storage: Arc<dyn Storage>,
275        worker_registry: Arc<WorkerRegistry>,
276    ) -> Self {
277        Self {
278            config,
279            storage,
280            worker_registry,
281            retry_policy: RetryPolicy::default(),
282            middleware: vec![Arc::new(TracingMiddleware)],
283            on_state_change: None,
284            is_running: Arc::new(tokio::sync::RwLock::new(false)),
285            shutdown_token: Arc::new(tokio::sync::Mutex::new(CancellationToken::new())),
286            worker_handles: Arc::new(tokio::sync::Mutex::new(Vec::new())),
287            server_id: Arc::new(tokio::sync::Mutex::new(None)),
288        }
289    }
290
291    /// Replace the middleware stack that wraps `worker.execute` in every
292    /// worker thread. Runs in registration order — the first entry is the
293    /// outermost layer.
294    ///
295    /// The default stack is `[TracingMiddleware]`; calling this replaces
296    /// it entirely. Re-add [`TracingMiddleware`] yourself if you still
297    /// want structured spans around every execution.
298    ///
299    /// Must be called before [`BackgroundJobServer::start`] — changes made
300    /// after a running server has spawned its worker threads won't affect
301    /// already-started processors.
302    pub fn with_middleware(mut self, middleware: Vec<Arc<dyn JobMiddleware>>) -> Self {
303        self.middleware = middleware;
304        self
305    }
306
307    /// Install a state-change hook fired after every persisted job state
308    /// transition driven by the processor. See [`StateChangeHook`] for
309    /// semantics — the hook runs synchronously inside `process_job`, so
310    /// keep it non-blocking.
311    ///
312    /// The hook is cloned into every per-worker [`JobProcessor`] when
313    /// [`BackgroundJobServer::start`] spawns workers, so callers must
314    /// install it before `start()`.
315    pub fn with_state_change_hook(mut self, hook: StateChangeHook) -> Self {
316        self.on_state_change = Some(hook);
317        self
318    }
319
320    /// Create a new background job server with custom retry policy
321    pub fn with_retry_policy(
322        config: ServerConfig,
323        storage: Arc<dyn Storage>,
324        worker_registry: Arc<WorkerRegistry>,
325        retry_policy: RetryPolicy,
326    ) -> Self {
327        let mut server = Self::new(config, storage, worker_registry);
328        server.retry_policy = retry_policy;
329        server
330    }
331
332    /// Start the background job server
333    pub async fn start(&self) -> Result<()> {
334        let mut is_running = self.is_running.write().await;
335        if *is_running {
336            return Err(QmlError::ConfigurationError {
337                message: "Server is already running".to_string(),
338            });
339        }
340
341        info!(
342            "Starting background job server '{}' with {} workers",
343            self.config.server_name, self.config.worker_count
344        );
345
346        // Re-queue any jobs left in `Processing` by a previous instance that
347        // crashed or was aborted mid-shutdown. Without this, stranded jobs
348        // would only be rescued when their lock expired (up to 30 minutes).
349        let stale_before = chrono::Utc::now() - self.config.stale_processing_after;
350        match self.storage.requeue_stranded_jobs(stale_before).await {
351            Ok(0) => {}
352            Ok(n) => info!("Recovered {} stranded Processing job(s) on startup", n),
353            Err(e) => warn!(
354                "Failed to recover stranded Processing jobs on startup: {}",
355                e
356            ),
357        }
358
359        // Fresh shutdown token so a restart isn't born cancelled.
360        let shutdown_token = CancellationToken::new();
361        *self.shutdown_token.lock().await = shutdown_token.clone();
362
363        // Derive a unique `server_id` when heartbeats are enabled. This id
364        // is what gets stamped into `JobState::Processing::server_name`
365        // (via `WorkerConfig::server_name`), and it's what peer reclaim
366        // matches on. Without the UUID suffix, two running instances
367        // sharing a `server_name` would reclaim each other's work.
368        let server_identity = if self.config.enable_heartbeat {
369            let id = format!("{}#{}", self.config.server_name, uuid::Uuid::new_v4());
370            let info = ServerInfo::new(
371                id.clone(),
372                &self.config.server_name,
373                self.config.worker_count as u32,
374                self.config.queues.clone(),
375            );
376            self.storage
377                .register_server(&info)
378                .await
379                .map_err(|e| QmlError::StorageError {
380                    message: format!("Failed to register server heartbeat: {}", e),
381                })?;
382            *self.server_id.lock().await = Some(id.clone());
383            Some(id)
384        } else {
385            *self.server_id.lock().await = None;
386            None
387        };
388
389        *is_running = true;
390        drop(is_running);
391
392        // Start scheduler if enabled
393        if self.config.enable_scheduler {
394            let scheduler = JobScheduler::with_poll_interval(
395                self.storage.clone(),
396                self.config.scheduler_poll_interval,
397            );
398            let scheduler_cancel = shutdown_token.clone();
399
400            let scheduler_handle = tokio::spawn(async move {
401                if let Err(e) = scheduler.run_until_cancelled(scheduler_cancel).await {
402                    error!("Scheduler error: {}", e);
403                }
404            });
405
406            self.worker_handles.lock().await.push(scheduler_handle);
407        }
408
409        // Start recurring-job poller if enabled
410        if self.config.enable_recurring {
411            let poller =
412                RecurringJobPoller::new(self.storage.clone(), self.config.recurring_poll_interval);
413            let cancel = shutdown_token.clone();
414            let handle = tokio::spawn(async move {
415                if let Err(e) = poller.run_until_cancelled(cancel).await {
416                    error!("Recurring poller error: {}", e);
417                }
418            });
419            self.worker_handles.lock().await.push(handle);
420        }
421
422        // Start cleanup worker if enabled
423        if self.config.enable_cleanup {
424            let cleanup = CleanupWorker::new(self.storage.clone(), self.config.cleanup_interval);
425            let cancel = shutdown_token.clone();
426            let handle = tokio::spawn(async move {
427                if let Err(e) = cleanup.run_until_cancelled(cancel).await {
428                    error!("Cleanup worker error: {}", e);
429                }
430            });
431            self.worker_handles.lock().await.push(handle);
432        }
433
434        // Start heartbeat worker if enabled. Must come after the registry
435        // row is inserted by `register_server` above so the first bump
436        // finds a row.
437        if let Some(ref id) = server_identity {
438            let heartbeat = HeartbeatWorker::new(
439                self.storage.clone(),
440                id.clone(),
441                self.config.heartbeat_interval,
442                self.config.dead_server_timeout,
443            );
444            let cancel = shutdown_token.clone();
445            let handle = tokio::spawn(async move {
446                if let Err(e) = heartbeat.run_until_cancelled(cancel).await {
447                    error!("Heartbeat worker error: {}", e);
448                }
449            });
450            self.worker_handles.lock().await.push(handle);
451        }
452
453        // Start worker threads. When heartbeats are on, the unique
454        // `server_identity` is what gets stamped into
455        // `JobState::Processing::server_name`; otherwise fall back to the
456        // configured `server_name`.
457        let stamped_name = server_identity
458            .clone()
459            .unwrap_or_else(|| self.config.server_name.clone());
460        self.start_workers(shutdown_token, stamped_name).await?;
461
462        info!("Background job server started successfully");
463        Ok(())
464    }
465
466    /// Stop the background job server.
467    ///
468    /// Cancels the shutdown token so every worker drops out of its polling
469    /// loop after finishing its current job, then waits up to
470    /// `config.shutdown_timeout` for all tasks to join. Any task still
471    /// running past the timeout is aborted — those jobs will need
472    /// stale-processing recovery on next startup.
473    pub async fn stop(&self) -> Result<()> {
474        let mut is_running = self.is_running.write().await;
475        if !*is_running {
476            return Ok(());
477        }
478
479        info!(
480            "Stopping background job server '{}'",
481            self.config.server_name
482        );
483
484        self.shutdown_token.lock().await.cancel();
485        *is_running = false;
486        drop(is_running);
487
488        // Deregister our heartbeat row (if any) before waiting on tasks,
489        // so a peer scanning during our shutdown grace window doesn't
490        // briefly see us as alive with no running loop.
491        if let Some(id) = self.server_id.lock().await.take()
492            && let Err(e) = self.storage.deregister_server(&id).await
493        {
494            warn!("Failed to deregister server '{}' on stop: {}", id, e);
495        }
496
497        let handles = {
498            let mut guard = self.worker_handles.lock().await;
499            std::mem::take(&mut *guard)
500        };
501        let abort_handles: Vec<_> = handles.iter().map(|h| h.abort_handle()).collect();
502
503        let shutdown_timeout = self
504            .config
505            .shutdown_timeout
506            .to_std()
507            .unwrap_or(std::time::Duration::from_secs(30));
508
509        let join_all = async {
510            for handle in handles {
511                let _ = handle.await;
512            }
513        };
514
515        match tokio::time::timeout(shutdown_timeout, join_all).await {
516            Ok(()) => info!("Background job server stopped cleanly"),
517            Err(_) => {
518                warn!(
519                    "Shutdown grace period of {:?} elapsed; aborting {} remaining task(s)",
520                    shutdown_timeout,
521                    abort_handles.len()
522                );
523                for handle in &abort_handles {
524                    handle.abort();
525                }
526            }
527        }
528
529        Ok(())
530    }
531
532    /// Check if the server is running
533    pub async fn is_running(&self) -> bool {
534        *self.is_running.read().await
535    }
536
537    /// Get server configuration
538    pub fn config(&self) -> &ServerConfig {
539        &self.config
540    }
541
542    /// Register (or update) a recurring job template.
543    ///
544    /// `id` uniquely identifies this template — calling again with the same
545    /// `id` replaces the previous definition. `cron` is a 6-field
546    /// cron expression (second minute hour day month day-of-week) parsed by
547    /// the `cron` crate. The template is stored via
548    /// [`Storage::upsert_recurring_job`] and the running
549    /// [`RecurringJobPoller`] will materialize it into a normal [`Job`] the
550    /// next time `next_run_at` is in the past.
551    pub async fn schedule_recurring(
552        &self,
553        id: impl Into<String>,
554        cron: impl Into<String>,
555        method: impl Into<String>,
556        payload: serde_json::Value,
557        queue: impl Into<String>,
558    ) -> Result<()> {
559        let recurring = RecurringJob::new(id, cron, method, payload, queue)?;
560        self.storage
561            .upsert_recurring_job(&recurring)
562            .await
563            .map_err(|e| QmlError::StorageError {
564                message: format!("Failed to upsert recurring job: {}", e),
565            })
566    }
567
568    /// Remove a recurring job template by id. Returns `true` if a row was
569    /// deleted, `false` if no template with that id existed.
570    pub async fn remove_recurring(&self, id: &str) -> Result<bool> {
571        self.storage
572            .remove_recurring_job(id)
573            .await
574            .map_err(|e| QmlError::StorageError {
575                message: format!("Failed to remove recurring job: {}", e),
576            })
577    }
578
579    /// Start worker threads.
580    ///
581    /// `stamped_server_name` is what each worker writes into
582    /// [`JobState::Processing::server_name`](crate::core::JobState::Processing)
583    /// when it claims a job. When heartbeats are enabled it's the unique
584    /// `server_id` (`{server_name}#{uuid}`); otherwise it's the configured
585    /// `server_name`.
586    async fn start_workers(
587        &self,
588        shutdown_token: CancellationToken,
589        stamped_server_name: String,
590    ) -> Result<()> {
591        let mut handles = self.worker_handles.lock().await;
592
593        for worker_id in 0..self.config.worker_count {
594            let worker_config =
595                WorkerConfig::new(format!("{}:worker:{}", self.config.server_name, worker_id))
596                    .server_name(&stamped_server_name)
597                    .queues(self.config.queues.clone())
598                    .job_timeout(self.config.job_timeout)
599                    .polling_interval(self.config.polling_interval);
600
601            // Each worker gets a child token: cancelling the parent cancels
602            // every child, and individual child cancellations (e.g. from
603            // timeout) don't affect siblings.
604            let worker_cancel = shutdown_token.child_token();
605
606            let mut processor = JobProcessor::with_retry_policy(
607                self.worker_registry.clone(),
608                self.storage.clone(),
609                worker_config,
610                self.retry_policy.clone(),
611            )
612            .with_cancellation(worker_cancel.clone())
613            .with_ttls(self.config.succeeded_ttl, self.config.failed_ttl)
614            .with_middleware(self.middleware.clone());
615
616            if let Some(hook) = &self.on_state_change {
617                processor = processor.with_state_change_hook(hook.clone());
618            }
619
620            let storage_clone = self.storage.clone();
621            let config_clone = self.config.clone();
622
623            let handle = tokio::spawn(async move {
624                Self::worker_loop(processor, storage_clone, config_clone, worker_cancel).await;
625            });
626
627            handles.push(handle);
628        }
629
630        info!("Started {} worker threads", self.config.worker_count);
631        Ok(())
632    }
633
634    /// Main worker loop for processing jobs.
635    ///
636    /// Polls storage for jobs on `config.polling_interval`. On every tick the
637    /// loop first checks whether the shutdown token was cancelled — if so,
638    /// the loop exits before starting a new job. Jobs that are already in
639    /// flight are *not* interrupted; they run to completion and the loop
640    /// exits after the current call returns.
641    async fn worker_loop(
642        processor: JobProcessor,
643        storage: Arc<dyn Storage>,
644        config: ServerConfig,
645        cancel: CancellationToken,
646    ) {
647        debug!("Worker thread started");
648
649        let mut interval = interval(
650            config
651                .polling_interval
652                .to_std()
653                .unwrap_or(std::time::Duration::from_secs(1)),
654        );
655        // Default `Burst` makes a slow worker (one whose process_job ran
656        // longer than `polling_interval`) try to "catch up" by firing
657        // every missed tick back-to-back. That just hammers storage with
658        // queries the worker can't keep up with anyway. `Skip` collapses
659        // backed-up ticks into one — the right shape for a poller.
660        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
661
662        loop {
663            tokio::select! {
664                biased;
665                _ = cancel.cancelled() => break,
666                _ = interval.tick() => {}
667            }
668
669            // Fetch and lock an available job for this worker
670            let queue_filter = if config.queues.is_empty() {
671                None
672            } else {
673                Some(config.queues.as_slice())
674            };
675
676            match storage
677                .fetch_and_lock_job(processor.get_worker_id(), queue_filter)
678                .await
679            {
680                Ok(Some(job)) => {
681                    debug!("Fetched job {} for processing", job.id);
682
683                    // Process the job. We deliberately don't race this against
684                    // the cancellation token — cooperative cancellation is the
685                    // worker impl's responsibility via `WorkerContext::cancel`.
686                    if let Err(e) = processor.process_job(job).await {
687                        error!("Error processing job: {}", e);
688                    }
689                }
690                Ok(None) => {
691                    // No jobs available, continue polling
692                }
693                Err(e) => {
694                    error!("Error fetching jobs: {}", e);
695                    // Back off on error, but remain cancellable during the nap.
696                    // Add jitter so a fleet of workers all observing the same
697                    // transient backend failure doesn't resume in lock-step
698                    // and stampede the recovering backend on every tick.
699                    let jitter_ms = fastrand::u64(0..1500);
700                    let backoff = std::time::Duration::from_millis(5_000 + jitter_ms);
701                    tokio::select! {
702                        _ = cancel.cancelled() => break,
703                        _ = sleep(backoff) => {}
704                    }
705                }
706            }
707        }
708
709        debug!("Worker thread stopped");
710    }
711}
712
713#[cfg(test)]
714mod tests {
715    use super::*;
716    use crate::processing::{Worker, WorkerContext, WorkerResult};
717    use crate::storage::{MemoryStorage, MonitoringApi};
718    use async_trait::async_trait;
719    use std::sync::atomic::{AtomicUsize, Ordering};
720
721    struct TestWorker {
722        method: String,
723        call_count: Arc<AtomicUsize>,
724    }
725
726    impl TestWorker {
727        fn new(method: &str) -> Self {
728            Self {
729                method: method.to_string(),
730                call_count: Arc::new(AtomicUsize::new(0)),
731            }
732        }
733
734        #[allow(dead_code)]
735        fn call_count(&self) -> usize {
736            self.call_count.load(Ordering::Relaxed)
737        }
738    }
739
740    #[async_trait]
741    impl Worker for TestWorker {
742        async fn execute(
743            &self,
744            _job: &crate::core::Job,
745            _context: &WorkerContext,
746        ) -> Result<WorkerResult> {
747            self.call_count.fetch_add(1, Ordering::Relaxed);
748            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
749            Ok(WorkerResult::success(
750                Some("Test completed".to_string()),
751                10,
752            ))
753        }
754
755        fn method_name(&self) -> &str {
756            &self.method
757        }
758    }
759
760    #[tokio::test]
761    async fn test_server_start_stop() {
762        let storage = Arc::new(MemoryStorage::new());
763        let mut registry = WorkerRegistry::new();
764        registry.register(TestWorker::new("test_method"));
765        let registry = Arc::new(registry);
766
767        let config = ServerConfig::new("test-server")
768            .worker_count(2)
769            .polling_interval(Duration::milliseconds(100))
770            .enable_scheduler(false);
771
772        let server = BackgroundJobServer::new(config, storage, registry);
773
774        // Start server
775        server.start().await.unwrap();
776        assert!(server.is_running().await);
777
778        // Stop server
779        server.stop().await.unwrap();
780        assert!(!server.is_running().await);
781    }
782
783    /// Regression test for S1: `stop()` must let an in-flight job finish
784    /// instead of aborting it immediately. A 500ms job kicked off right
785    /// before `stop()` should end up in `Succeeded`, not stranded in
786    /// `Processing`.
787    #[tokio::test]
788    async fn stop_waits_for_inflight_job_to_complete() {
789        struct SlowWorker {
790            done: Arc<AtomicUsize>,
791        }
792
793        #[async_trait]
794        impl Worker for SlowWorker {
795            async fn execute(
796                &self,
797                _job: &crate::core::Job,
798                _ctx: &WorkerContext,
799            ) -> Result<WorkerResult> {
800                tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
801                self.done.fetch_add(1, Ordering::Relaxed);
802                Ok(WorkerResult::success(None, 500))
803            }
804
805            fn method_name(&self) -> &str {
806                "slow_method"
807            }
808        }
809
810        let storage = Arc::new(MemoryStorage::new());
811        let done = Arc::new(AtomicUsize::new(0));
812
813        let mut registry = WorkerRegistry::new();
814        registry.register(SlowWorker { done: done.clone() });
815        let registry = Arc::new(registry);
816
817        let config = ServerConfig::new("s1-test")
818            .worker_count(1)
819            .polling_interval(Duration::milliseconds(10))
820            .enable_scheduler(false)
821            .shutdown_timeout(Duration::seconds(5));
822
823        let server = BackgroundJobServer::new(config, storage.clone(), registry);
824
825        let job = crate::core::Job::new("slow_method", serde_json::Value::Null);
826        let job_id = job.id.clone();
827        storage.enqueue(&job).await.unwrap();
828
829        server.start().await.unwrap();
830
831        // Wait long enough for the worker to grab the job, then stop
832        // while it's still running.
833        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
834        server.stop().await.unwrap();
835
836        // After stop() returns, the job must be Succeeded — not stranded
837        // in Processing — and the worker must have observed completion.
838        assert_eq!(done.load(Ordering::Relaxed), 1, "worker should complete");
839        let final_job = storage.get(&job_id).await.unwrap().unwrap();
840        assert!(
841            matches!(final_job.state, crate::core::JobState::Succeeded { .. }),
842            "job should be Succeeded after graceful stop, got {:?}",
843            final_job.state
844        );
845    }
846
847    /// Regression test for S2: the cancellation token on `WorkerContext`
848    /// must be cancelled when the server shuts down, so a cooperative
849    /// worker impl can drop out early.
850    #[tokio::test]
851    async fn worker_context_cancel_token_fires_on_stop() {
852        use tokio::sync::Notify;
853
854        struct CancellableWorker {
855            observed_cancel: Arc<AtomicUsize>,
856            started: Arc<Notify>,
857        }
858
859        #[async_trait]
860        impl Worker for CancellableWorker {
861            async fn execute(
862                &self,
863                _job: &crate::core::Job,
864                ctx: &WorkerContext,
865            ) -> Result<WorkerResult> {
866                self.started.notify_one();
867                tokio::select! {
868                    _ = ctx.cancel.cancelled() => {
869                        self.observed_cancel.fetch_add(1, Ordering::Relaxed);
870                        Ok(WorkerResult::success(None, 0))
871                    }
872                    _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
873                        Ok(WorkerResult::success(None, 10_000))
874                    }
875                }
876            }
877
878            fn method_name(&self) -> &str {
879                "cancellable"
880            }
881        }
882
883        let storage = Arc::new(MemoryStorage::new());
884        let observed_cancel = Arc::new(AtomicUsize::new(0));
885        let started = Arc::new(Notify::new());
886
887        let mut registry = WorkerRegistry::new();
888        registry.register(CancellableWorker {
889            observed_cancel: observed_cancel.clone(),
890            started: started.clone(),
891        });
892        let registry = Arc::new(registry);
893
894        let config = ServerConfig::new("s2-test")
895            .worker_count(1)
896            .polling_interval(Duration::milliseconds(10))
897            .enable_scheduler(false)
898            .shutdown_timeout(Duration::seconds(5));
899
900        let server = BackgroundJobServer::new(config, storage.clone(), registry);
901
902        let job = crate::core::Job::new("cancellable", serde_json::Value::Null);
903        storage.enqueue(&job).await.unwrap();
904
905        server.start().await.unwrap();
906        // Wait until the worker has actually entered `execute`.
907        started.notified().await;
908
909        server.stop().await.unwrap();
910        assert_eq!(
911            observed_cancel.load(Ordering::Relaxed),
912            1,
913            "worker should have observed its cancel token firing"
914        );
915    }
916
917    /// Regression test for S3: `start()` must sweep stale `Processing`
918    /// jobs left behind by a previous instance back to `Enqueued`.
919    #[tokio::test]
920    async fn start_recovers_stranded_processing_jobs() {
921        let storage = Arc::new(MemoryStorage::new());
922
923        // Seed a job stuck in Processing with a very old started_at,
924        // simulating a crashed worker from a previous server instance.
925        let mut stranded = crate::core::Job::new("noop", serde_json::Value::Null);
926        stranded.state = crate::core::JobState::Processing {
927            started_at: chrono::Utc::now() - Duration::hours(1),
928            worker_id: "dead-worker".to_string(),
929            server_name: "dead-server".to_string(),
930        };
931        let stranded_id = stranded.id.clone();
932        storage.enqueue(&stranded).await.unwrap();
933
934        let mut registry = WorkerRegistry::new();
935        registry.register(TestWorker::new("noop"));
936        let registry = Arc::new(registry);
937
938        let config = ServerConfig::new("s3-test")
939            .worker_count(0) // no workers — we only want the startup sweep
940            .enable_scheduler(false)
941            .stale_processing_after(Duration::minutes(5));
942
943        let server = BackgroundJobServer::new(config, storage.clone(), registry);
944        server.start().await.unwrap();
945        // Give start() a moment to finish the sweep.
946        tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
947        server.stop().await.unwrap();
948
949        let recovered = storage.get(&stranded_id).await.unwrap().unwrap();
950        assert!(
951            matches!(recovered.state, crate::core::JobState::Enqueued { .. }),
952            "stranded job should have been requeued, got {:?}",
953            recovered.state
954        );
955    }
956
957    #[tokio::test]
958    async fn test_job_processing() {
959        let storage = Arc::new(MemoryStorage::new());
960        let worker = TestWorker::new("test_method");
961        let call_count = worker.call_count.clone();
962
963        let mut registry = WorkerRegistry::new();
964        registry.register(worker);
965        let registry = Arc::new(registry);
966
967        let config = ServerConfig::new("test-server")
968            .worker_count(1)
969            .polling_interval(Duration::milliseconds(10))
970            .fetch_batch_size(1)
971            .enable_scheduler(false);
972
973        let server = BackgroundJobServer::new(config, storage.clone(), registry);
974
975        // Enqueue a test job
976        let job = crate::core::Job::new("test_method", serde_json::json!(["arg1".to_string()]));
977        storage.enqueue(&job).await.unwrap();
978
979        // Start server
980        server.start().await.unwrap();
981
982        // Wait for job to be processed
983        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
984
985        // Stop server
986        server.stop().await.unwrap();
987
988        // Check that the job was processed
989        assert!(call_count.load(Ordering::Relaxed) > 0);
990    }
991}