Skip to main content

qml_rs/processing/
processor.rs

1//! Job processor for executing individual jobs
2//!
3//! This module contains the JobProcessor that handles the execution lifecycle
4//! of individual jobs, including state transitions and retry logic.
5
6use chrono::{Duration, Utc};
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9use tracing::{debug, error, info, warn};
10
11use super::{
12    WorkerConfig, WorkerRegistry, WorkerResult,
13    cleanup::{DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL},
14    middleware::{self, JobMiddleware, TracingMiddleware},
15    retry::RetryPolicy,
16    worker::WorkerContext,
17};
18use crate::core::{Job, JobState};
19use crate::error::{QmlError, Result};
20use crate::storage::Storage;
21use crate::storage::prelude::*;
22
23/// Default middleware stack installed on every `JobProcessor` — just the
24/// built-in tracing span wrapper. Callers can replace this via
25/// [`JobProcessor::with_middleware`].
26fn default_middleware() -> Vec<Arc<dyn JobMiddleware>> {
27    vec![Arc::new(TracingMiddleware)]
28}
29
30/// Callback invoked after every persisted job state transition driven by
31/// the processor. Receives the job (with its new state already applied),
32/// the previous state, and the new state.
33///
34/// Held as an `Arc<dyn Fn…>` so one hook can be cloned across every worker
35/// thread without additional allocation. The callback runs synchronously
36/// inside `process_job`; keep it non-blocking — offload anything
37/// expensive to a channel or a spawned task.
38///
39/// Fires exactly once per persisted transition, *after* the in-memory
40/// `Job` has its new state and *before* `storage.update(...)`. The
41/// intermediate `Failed` step the state machine forces between
42/// `Processing` and `AwaitingRetry` is **not** observed — the hook sees
43/// the logical transition from the pre-retry state directly to
44/// `AwaitingRetry`, matching what storage ends up holding.
45pub type StateChangeHook = Arc<dyn Fn(&Job, &JobState, &JobState) + Send + Sync>;
46
47/// Job processor that executes jobs and manages their lifecycle
48pub struct JobProcessor {
49    worker_registry: Arc<WorkerRegistry>,
50    storage: Arc<dyn Storage>,
51    retry_policy: RetryPolicy,
52    worker_config: WorkerConfig,
53    /// Cancellation token handed to every `WorkerContext` this processor
54    /// creates. Defaults to a detached token; the server installs a
55    /// shutdown-linked child via [`JobProcessor::with_cancellation`].
56    cancel_token: CancellationToken,
57    /// TTL stamped onto `expires_at` when a job transitions to `Succeeded`.
58    /// The `CleanupWorker` deletes rows whose `expires_at` is in the past.
59    succeeded_ttl: Duration,
60    /// TTL stamped onto `expires_at` when a job transitions to a permanent
61    /// `Failed` state (i.e. retries exhausted).
62    failed_ttl: Duration,
63    /// Middleware stack that wraps `worker.execute(&job, &ctx)`. Runs in
64    /// registration order — the first entry is the outermost layer. A
65    /// built-in [`TracingMiddleware`] is prepended in [`JobProcessor::new`]
66    /// so every execution ships with a structured span; install your own
67    /// stack via [`JobProcessor::with_middleware`] to opt out.
68    middleware: Vec<Arc<dyn JobMiddleware>>,
69    /// Optional observer fired after every persisted state transition.
70    /// See [`StateChangeHook`] for semantics. `None` by default so
71    /// users who don't need it don't pay for a branch check on every
72    /// transition of every job.
73    on_state_change: Option<StateChangeHook>,
74}
75
76impl JobProcessor {
77    /// Create a new job processor
78    pub fn new(
79        worker_registry: Arc<WorkerRegistry>,
80        storage: Arc<dyn Storage>,
81        worker_config: WorkerConfig,
82    ) -> Self {
83        Self {
84            worker_registry,
85            storage,
86            retry_policy: RetryPolicy::default(),
87            worker_config,
88            cancel_token: CancellationToken::new(),
89            succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
90            failed_ttl: DEFAULT_FAILED_TTL,
91            middleware: default_middleware(),
92            on_state_change: None,
93        }
94    }
95
96    /// Create a new job processor with custom retry policy
97    pub fn with_retry_policy(
98        worker_registry: Arc<WorkerRegistry>,
99        storage: Arc<dyn Storage>,
100        worker_config: WorkerConfig,
101        retry_policy: RetryPolicy,
102    ) -> Self {
103        Self {
104            worker_registry,
105            storage,
106            retry_policy,
107            worker_config,
108            cancel_token: CancellationToken::new(),
109            succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
110            failed_ttl: DEFAULT_FAILED_TTL,
111            middleware: default_middleware(),
112            on_state_change: None,
113        }
114    }
115
116    /// Install a cancellation token that will be cloned into every
117    /// [`WorkerContext`] produced by this processor. Used by
118    /// `BackgroundJobServer` to wire cooperative shutdown through to worker
119    /// impls.
120    pub fn with_cancellation(mut self, cancel_token: CancellationToken) -> Self {
121        self.cancel_token = cancel_token;
122        self
123    }
124
125    /// Override the TTLs stamped onto `job.expires_at` when jobs reach a
126    /// final state. The `CleanupWorker` uses `expires_at` to drop rows
127    /// out-of-band.
128    pub fn with_ttls(mut self, succeeded_ttl: Duration, failed_ttl: Duration) -> Self {
129        self.succeeded_ttl = succeeded_ttl;
130        self.failed_ttl = failed_ttl;
131        self
132    }
133
134    /// Replace the middleware stack that wraps `worker.execute`. The
135    /// processor's built-in [`TracingMiddleware`] is dropped when you call
136    /// this — pass it in yourself (usually as the first entry) if you
137    /// still want structured spans around every job.
138    ///
139    /// Middleware runs in registration order: the first entry is the
140    /// outermost layer, the last is closest to the worker.
141    pub fn with_middleware(mut self, middleware: Vec<Arc<dyn JobMiddleware>>) -> Self {
142        self.middleware = middleware;
143        self
144    }
145
146    /// Install an observer fired after every persisted state transition.
147    /// See [`StateChangeHook`] for semantics.
148    pub fn with_state_change_hook(mut self, hook: StateChangeHook) -> Self {
149        self.on_state_change = Some(hook);
150        self
151    }
152
153    /// Apply a state transition: record the previous state, call
154    /// [`Job::set_state`] (which validates the transition), then — on
155    /// success — invoke the state-change hook. Storage is **not** touched
156    /// here; callers update storage themselves so the hook always fires
157    /// on the same transitions that end up persisted.
158    fn apply_state_change(&self, job: &mut Job, new_state: JobState) -> Result<()> {
159        let prev_state = job.state.clone();
160        job.set_state(new_state)?;
161        self.fire_state_change_hook(job, &prev_state);
162        Ok(())
163    }
164
165    /// Fire the state-change hook with an explicit previous state.
166    /// Factored out so retry path can span a two-step transition as a
167    /// single logical event (see [`JobProcessor::handle_job_retry`]).
168    fn fire_state_change_hook(&self, job: &Job, prev_state: &JobState) {
169        if let Some(hook) = &self.on_state_change {
170            hook(job, prev_state, &job.state);
171        }
172    }
173
174    /// Get the worker ID for this processor
175    pub fn get_worker_id(&self) -> &str {
176        &self.worker_config.worker_id
177    }
178
179    /// Process a single job
180    pub async fn process_job(&self, mut job: Job) -> Result<()> {
181        let job_id = job.id.clone();
182        let method = job.method.clone();
183
184        info!("Starting job processing: {} ({})", job_id, method);
185
186        // Record that we're taking another crack at this job. This runs before
187        // we fail for a missing worker so lookups still count against the retry
188        // budget.
189        job.attempt = job.attempt.saturating_add(1);
190
191        // Check if we have a worker for this job method
192        let worker = match self.worker_registry.get_worker(&method) {
193            Some(worker) => worker,
194            None => {
195                error!("No worker found for method: {}", method);
196                return self
197                    .fail_job_permanently(
198                        &mut job,
199                        format!("No worker registered for method: {}", method),
200                        None,
201                    )
202                    .await;
203            }
204        };
205
206        // Update job state to Processing (if not already)
207        if !matches!(job.state, JobState::Processing { .. }) {
208            let processing_state = JobState::processing(
209                &self.worker_config.worker_id,
210                &self.worker_config.server_name,
211            );
212
213            if let Err(e) = self.apply_state_change(&mut job, processing_state) {
214                error!("Failed to set job state to Processing: {}", e);
215                return Err(e);
216            }
217
218            // Save the updated state
219            if let Err(e) = self.storage.update(&job).await {
220                error!("Failed to update job state in storage: {}", e);
221                return Err(e.into());
222            }
223        }
224
225        // Create worker context
226        let context = if job.attempt > 1 {
227            let previous_exception = self.extract_previous_exception(&job);
228            WorkerContext::retry_from(self.worker_config.clone(), job.attempt, previous_exception)
229        } else {
230            WorkerContext::new(self.worker_config.clone())
231        }
232        .with_cancel(self.cancel_token.clone());
233
234        // Execute the job through the middleware stack. The terminal
235        // `worker.execute(&job, &context)` runs once every layer in
236        // `self.middleware` has called `next.run(...)`. An empty stack
237        // calls the worker directly.
238        let start_time = Utc::now();
239        let execution_result = match tokio::time::timeout(
240            self.worker_config.job_timeout.to_std().unwrap(),
241            middleware::run_stack(&self.middleware, worker, &job, &context),
242        )
243        .await
244        {
245            Ok(result) => result,
246            Err(_) => {
247                warn!(
248                    "Job {} timed out after {:?}",
249                    job_id, self.worker_config.job_timeout
250                );
251                return self.handle_job_timeout(&mut job).await;
252            }
253        };
254
255        let duration = (Utc::now() - start_time).num_milliseconds() as u64;
256
257        // Handle the execution result
258        match execution_result {
259            Ok(WorkerResult::Success {
260                result, metadata, ..
261            }) => {
262                info!("Job {} completed successfully in {}ms", job_id, duration);
263                self.complete_job_successfully(&mut job, result, duration, metadata)
264                    .await
265            }
266            Ok(WorkerResult::Retry {
267                error, retry_at, ..
268            }) => {
269                warn!("Job {} failed and will be retried: {}", job_id, error);
270                self.handle_job_retry(&mut job, error, retry_at).await
271            }
272            Ok(WorkerResult::Failure {
273                error, context: _, ..
274            }) => {
275                error!("Job {} failed permanently: {}", job_id, error);
276                self.fail_job_permanently(&mut job, error, None).await
277            }
278            Err(e) => {
279                error!("Job {} execution error: {}", job_id, e);
280                self.handle_execution_error(&mut job, e).await
281            }
282        }
283    }
284
285    /// Complete a job successfully
286    async fn complete_job_successfully(
287        &self,
288        job: &mut Job,
289        result: Option<String>,
290        duration_ms: u64,
291        metadata: std::collections::HashMap<String, String>,
292    ) -> Result<()> {
293        // Check if job is already in a final state
294        if job.state.is_final() {
295            debug!(
296                "Job {} is already in a final state, skipping success",
297                job.id
298            );
299            return Ok(());
300        }
301
302        let succeeded_state = JobState::succeeded(duration_ms, result);
303
304        if let Err(e) = self.apply_state_change(job, succeeded_state) {
305            error!("Failed to set job state to Succeeded: {}", e);
306            return Err(e);
307        }
308
309        // Stamp expiration so the out-of-band CleanupWorker can drop this
310        // row later without needing to re-evaluate its state.
311        job.expires_at = Some(Utc::now() + self.succeeded_ttl);
312
313        // Add execution metadata
314        for (key, value) in metadata {
315            job.add_metadata(format!("exec_{}", key), value);
316        }
317
318        // Update in storage
319        self.storage.update(job).await?;
320
321        Ok(())
322    }
323
324    /// Handle job retry
325    async fn handle_job_retry(
326        &self,
327        job: &mut Job,
328        error: String,
329        retry_at: Option<chrono::DateTime<Utc>>,
330    ) -> Result<()> {
331        // Check if job is already in a final state
332        if job.state.is_final() {
333            debug!("Job {} is already in a final state, skipping retry", job.id);
334            return Ok(());
335        }
336
337        // Check if we should retry based on policy
338        if !self.should_retry_attempt(job, None) {
339            debug!(
340                "Retry limit exceeded for job {}, failing permanently",
341                job.id
342            );
343            return self.fail_job_permanently(job, error, None).await;
344        }
345
346        // Save the pre-retry state for the state-change hook so
347        // observers see one logical transition (whatever-we-were-in →
348        // AwaitingRetry), not the intermediate Failed step that
349        // earlier revisions had to dance through.
350        let pre_retry_state = job.state.clone();
351
352        // Calculate retry time — the retry policy counts attempts starting from
353        // 1, and `job.attempt` already reflects the just-completed attempt, so
354        // pass it through unchanged.
355        let retry_time = retry_at
356            .or_else(|| self.retry_policy.calculate_retry_time(job.attempt))
357            .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
358
359        // One-shot transition. The state machine permits
360        // `Processing → AwaitingRetry` directly (added when this code
361        // was simplified), so we no longer need the
362        // `Processing → Failed → AwaitingRetry` two-step which was
363        // fragile against a panic landing on the intermediate Failed.
364        let retry_state = JobState::awaiting_retry(retry_time, &error);
365        if let Err(e) = job.set_state(retry_state) {
366            error!("Failed to set job state to AwaitingRetry: {}", e);
367            return Err(e);
368        }
369
370        // Fire the hook with the saved pre-retry state so observers see
371        // the logical transition.
372        self.fire_state_change_hook(job, &pre_retry_state);
373
374        // Update in storage
375        self.storage.update(job).await?;
376
377        info!(
378            "Job {} scheduled for retry (attempt #{}) at {}",
379            job.id, job.attempt, retry_time
380        );
381        Ok(())
382    }
383
384    /// Fail a job permanently
385    async fn fail_job_permanently(
386        &self,
387        job: &mut Job,
388        error: String,
389        stack_trace: Option<String>,
390    ) -> Result<()> {
391        // Check if job is already in a final state
392        if job.state.is_final() {
393            debug!(
394                "Job {} is already in a final state, skipping failure",
395                job.id
396            );
397            return Ok(());
398        }
399
400        let failed_state = JobState::failed(error, stack_trace);
401
402        if let Err(e) = self.apply_state_change(job, failed_state) {
403            error!("Failed to set job state to Failed: {}", e);
404            return Err(e);
405        }
406
407        // Permanent failure is a final state; stamp expiration so the
408        // CleanupWorker can drop it after `failed_ttl`.
409        job.expires_at = Some(Utc::now() + self.failed_ttl);
410
411        // Update in storage
412        self.storage.update(job).await?;
413
414        error!(
415            "Job {} failed permanently after {} attempts",
416            job.id, job.attempt
417        );
418        Ok(())
419    }
420
421    /// Handle job timeout
422    async fn handle_job_timeout(&self, job: &mut Job) -> Result<()> {
423        let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
424
425        if self.should_retry_attempt(job, Some("TimeoutError")) {
426            self.handle_job_retry(job, timeout_error, None).await
427        } else {
428            self.fail_job_permanently(job, timeout_error, None).await
429        }
430    }
431
432    /// Handle execution errors
433    async fn handle_execution_error(&self, job: &mut Job, error: QmlError) -> Result<()> {
434        let error_type = match &error {
435            QmlError::StorageError { .. } => "StorageError",
436            QmlError::WorkerError { .. } => "WorkerError",
437            QmlError::TimeoutError { .. } => "TimeoutError",
438            _ => "UnknownError",
439        };
440
441        let error_message = error.to_string();
442
443        if self.should_retry_attempt(job, Some(error_type)) {
444            self.handle_job_retry(job, error_message, None).await
445        } else {
446            self.fail_job_permanently(job, error_message, None).await
447        }
448    }
449
450    /// Determine whether another retry should be attempted for this job.
451    ///
452    /// `job.attempt` is the number of attempts completed so far (including the
453    /// one that just failed). The next run would be retry `#job.attempt`, so
454    /// both the job-level cap and the retry policy are checked against that
455    /// value.
456    fn should_retry_attempt(&self, job: &Job, exception_type: Option<&str>) -> bool {
457        if job.max_retries > 0 && job.attempt > job.max_retries {
458            return false;
459        }
460
461        self.retry_policy.should_retry(exception_type, job.attempt)
462    }
463
464    /// Extract previous exception from job state
465    fn extract_previous_exception(&self, job: &Job) -> Option<String> {
466        match &job.state {
467            JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
468            JobState::Failed { exception, .. } => Some(exception.clone()),
469            _ => None,
470        }
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477    use crate::processing::{RetryStrategy, Worker};
478    use crate::storage::{MemoryStorage, MonitoringApi};
479    use async_trait::async_trait;
480    use chrono::Duration;
481    use std::sync::Arc;
482
483    struct TestWorker {
484        method: String,
485        should_succeed: bool,
486        should_retry: bool,
487    }
488
489    impl TestWorker {
490        fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
491            Self {
492                method: method.to_string(),
493                should_succeed,
494                should_retry,
495            }
496        }
497    }
498
499    #[async_trait]
500    impl Worker for TestWorker {
501        async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
502            if self.should_succeed {
503                Ok(WorkerResult::success(Some("Test result".to_string()), 100))
504            } else if self.should_retry {
505                Ok(WorkerResult::retry("Test error".to_string(), None))
506            } else {
507                Ok(WorkerResult::failure("Permanent failure".to_string()))
508            }
509        }
510
511        fn method_name(&self) -> &str {
512            &self.method
513        }
514    }
515
516    #[tokio::test]
517    async fn test_successful_job_processing() {
518        let storage = Arc::new(MemoryStorage::new());
519        let mut registry = WorkerRegistry::new();
520        registry.register(TestWorker::new("test_method", true, false));
521        let registry = Arc::new(registry);
522
523        let config = WorkerConfig::new("test-worker");
524        let processor = JobProcessor::new(registry, storage.clone(), config);
525
526        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
527        let job_id = job.id.clone();
528
529        // Store the job first
530        storage.enqueue(&job).await.unwrap();
531
532        // Process the job
533        processor.process_job(job).await.unwrap();
534
535        // Check that the job is marked as succeeded
536        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
537        assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
538    }
539
540    #[tokio::test]
541    async fn test_job_retry() {
542        let storage = Arc::new(MemoryStorage::new());
543        let mut registry = WorkerRegistry::new();
544        registry.register(TestWorker::new("test_method", false, true));
545        let registry = Arc::new(registry);
546
547        let config = WorkerConfig::new("test-worker");
548        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
549        let processor =
550            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
551
552        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
553        let job_id = job.id.clone();
554
555        // Store the job first
556        storage.enqueue(&job).await.unwrap();
557
558        // Process the job
559        processor.process_job(job).await.unwrap();
560
561        // Check that the job is awaiting retry
562        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
563        assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
564    }
565
566    #[tokio::test]
567    async fn test_job_permanent_failure() {
568        let storage = Arc::new(MemoryStorage::new());
569        let mut registry = WorkerRegistry::new();
570        registry.register(TestWorker::new("test_method", false, false));
571        let registry = Arc::new(registry);
572
573        let config = WorkerConfig::new("test-worker");
574        let processor = JobProcessor::new(registry, storage.clone(), config);
575
576        let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
577        let job_id = job.id.clone();
578
579        // Store the job first
580        storage.enqueue(&job).await.unwrap();
581
582        // Process the job
583        processor.process_job(job).await.unwrap();
584
585        // Check that the job failed permanently
586        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
587        assert!(matches!(updated_job.state, JobState::Failed { .. }));
588    }
589
590    #[tokio::test]
591    async fn test_job_respects_retry_limit() {
592        let storage = Arc::new(MemoryStorage::new());
593        let mut registry = WorkerRegistry::new();
594        registry.register(TestWorker::new("limited_retry_method", false, true));
595        let registry = Arc::new(registry);
596
597        let config = WorkerConfig::new("test-worker");
598        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 1));
599        let processor =
600            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
601
602        let job = Job::new("limited_retry_method", serde_json::Value::Null);
603        let job_id = job.id.clone();
604        storage.enqueue(&job).await.unwrap();
605
606        // First attempt should schedule a retry
607        processor.process_job(job.clone()).await.unwrap();
608
609        let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
610        assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
611        assert_eq!(retry_job.attempt, 1);
612
613        // Make the retry immediately eligible by re-enqueuing it
614        retry_job
615            .set_state(JobState::enqueued(&retry_job.queue))
616            .unwrap();
617        storage.update(&retry_job).await.unwrap();
618
619        // Second processing attempt should hit the retry limit and fail permanently
620        processor.process_job(retry_job).await.unwrap();
621
622        let final_job = storage.get(&job_id).await.unwrap().unwrap();
623        assert!(matches!(final_job.state, JobState::Failed { .. }));
624        assert_eq!(final_job.attempt, 2);
625    }
626
627    #[tokio::test]
628    async fn test_job_respects_job_specific_max_retries() {
629        let storage = Arc::new(MemoryStorage::new());
630        let mut registry = WorkerRegistry::new();
631        registry.register(TestWorker::new("job_specific_limit", false, true));
632        let registry = Arc::new(registry);
633
634        let config = WorkerConfig::new("test-worker");
635        // Policy allows plenty of retries, job-level limit should stop at 1
636        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 5));
637        let processor =
638            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
639
640        let job = Job::with_config(
641            "job_specific_limit",
642            serde_json::Value::Null,
643            "default",
644            0,
645            1,
646        );
647        let job_id = job.id.clone();
648        storage.enqueue(&job).await.unwrap();
649
650        // First attempt schedules retry
651        processor.process_job(job.clone()).await.unwrap();
652
653        let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
654        assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
655        assert_eq!(retry_job.attempt, 1);
656
657        retry_job
658            .set_state(JobState::enqueued(&retry_job.queue))
659            .unwrap();
660        storage.update(&retry_job).await.unwrap();
661
662        processor.process_job(retry_job).await.unwrap();
663
664        let final_job = storage.get(&job_id).await.unwrap().unwrap();
665        assert!(matches!(final_job.state, JobState::Failed { .. }));
666        assert_eq!(final_job.attempt, 2);
667    }
668
669    #[tokio::test]
670    async fn failed_to_enqueued_to_failed_increments_attempt() {
671        // Regression test for B3: when a job is manually re-enqueued after a
672        // terminal failure, the processor must treat the next run as a
673        // distinct attempt and bump `job.attempt` accordingly rather than
674        // resetting it or double-counting.
675        let storage = Arc::new(MemoryStorage::new());
676        let mut registry = WorkerRegistry::new();
677        registry.register(TestWorker::new("manual_retry_method", false, false));
678        let registry = Arc::new(registry);
679
680        let config = WorkerConfig::new("test-worker");
681        let processor = JobProcessor::new(registry, storage.clone(), config);
682
683        let job = Job::new("manual_retry_method", serde_json::Value::Null);
684        let job_id = job.id.clone();
685        storage.enqueue(&job).await.unwrap();
686
687        // First attempt: worker returns Failure, so this hits the permanent
688        // failure path directly.
689        processor.process_job(job).await.unwrap();
690        let after_first = storage.get(&job_id).await.unwrap().unwrap();
691        assert!(matches!(after_first.state, JobState::Failed { .. }));
692        assert_eq!(after_first.attempt, 1);
693
694        // Manual retry: Failed → Enqueued is a legal transition.
695        let mut manual = after_first;
696        manual.set_state(JobState::enqueued(&manual.queue)).unwrap();
697        storage.update(&manual).await.unwrap();
698
699        // Second attempt: fails again, attempt counter must advance.
700        processor.process_job(manual).await.unwrap();
701        let after_second = storage.get(&job_id).await.unwrap().unwrap();
702        assert!(matches!(after_second.state, JobState::Failed { .. }));
703        assert_eq!(after_second.attempt, 2);
704    }
705
706    // --- F2: state-change hook tests ---
707
708    use crate::core::JobStateKind;
709    use std::sync::Mutex;
710
711    /// Collect every (prev_kind, new_kind) pair the hook observes for a
712    /// single processor run. Returned as an `Arc` so the hook closure and
713    /// the test body share the same buffer.
714    fn install_recording_hook(
715        processor: JobProcessor,
716    ) -> (JobProcessor, Arc<Mutex<Vec<(JobStateKind, JobStateKind)>>>) {
717        let transitions = Arc::new(Mutex::new(Vec::<(JobStateKind, JobStateKind)>::new()));
718        let captured = transitions.clone();
719        let hook: StateChangeHook = Arc::new(move |_job, prev, new| {
720            captured.lock().unwrap().push((prev.kind(), new.kind()));
721        });
722        (processor.with_state_change_hook(hook), transitions)
723    }
724
725    #[tokio::test]
726    async fn state_change_hook_fires_for_successful_path() {
727        let storage = Arc::new(MemoryStorage::new());
728        let mut registry = WorkerRegistry::new();
729        registry.register(TestWorker::new("hook_success", true, false));
730        let registry = Arc::new(registry);
731
732        let config = WorkerConfig::new("hook-worker");
733        let processor = JobProcessor::new(registry, storage.clone(), config);
734        let (processor, transitions) = install_recording_hook(processor);
735
736        let job = Job::new("hook_success", serde_json::Value::Null);
737        storage.enqueue(&job).await.unwrap();
738        processor.process_job(job).await.unwrap();
739
740        let transitions = transitions.lock().unwrap();
741        assert_eq!(
742            *transitions,
743            vec![
744                (JobStateKind::Enqueued, JobStateKind::Processing),
745                (JobStateKind::Processing, JobStateKind::Succeeded),
746            ]
747        );
748    }
749
750    #[tokio::test]
751    async fn state_change_hook_skips_intermediate_failed_in_retry_path() {
752        let storage = Arc::new(MemoryStorage::new());
753        let mut registry = WorkerRegistry::new();
754        registry.register(TestWorker::new("hook_retry", false, true));
755        let registry = Arc::new(registry);
756
757        let config = WorkerConfig::new("hook-worker");
758        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 3));
759        let processor =
760            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
761        let (processor, transitions) = install_recording_hook(processor);
762
763        let job = Job::new("hook_retry", serde_json::Value::Null);
764        storage.enqueue(&job).await.unwrap();
765        processor.process_job(job).await.unwrap();
766
767        // Logical transitions: Enqueued → Processing → AwaitingRetry.
768        // The intermediate Failed the state machine forces between
769        // Processing and AwaitingRetry must NOT appear in the stream.
770        let transitions = transitions.lock().unwrap();
771        assert_eq!(
772            *transitions,
773            vec![
774                (JobStateKind::Enqueued, JobStateKind::Processing),
775                (JobStateKind::Processing, JobStateKind::AwaitingRetry),
776            ]
777        );
778    }
779
780    #[tokio::test]
781    async fn state_change_hook_fires_for_permanent_failure() {
782        let storage = Arc::new(MemoryStorage::new());
783        let mut registry = WorkerRegistry::new();
784        registry.register(TestWorker::new("hook_fail", false, false));
785        let registry = Arc::new(registry);
786
787        let config = WorkerConfig::new("hook-worker");
788        let processor = JobProcessor::new(registry, storage.clone(), config);
789        let (processor, transitions) = install_recording_hook(processor);
790
791        let job = Job::new("hook_fail", serde_json::Value::Null);
792        storage.enqueue(&job).await.unwrap();
793        processor.process_job(job).await.unwrap();
794
795        let transitions = transitions.lock().unwrap();
796        assert_eq!(
797            *transitions,
798            vec![
799                (JobStateKind::Enqueued, JobStateKind::Processing),
800                (JobStateKind::Processing, JobStateKind::Failed),
801            ]
802        );
803    }
804
805    #[tokio::test]
806    async fn state_change_hook_is_opt_in_default_is_no_op() {
807        // No hook installed: processor must complete without panicking
808        // and without any observable side effect beyond normal processing.
809        let storage = Arc::new(MemoryStorage::new());
810        let mut registry = WorkerRegistry::new();
811        registry.register(TestWorker::new("no_hook", true, false));
812        let registry = Arc::new(registry);
813
814        let config = WorkerConfig::new("hook-worker");
815        let processor = JobProcessor::new(registry, storage.clone(), config);
816
817        let job = Job::new("no_hook", serde_json::Value::Null);
818        let job_id = job.id.clone();
819        storage.enqueue(&job).await.unwrap();
820        processor.process_job(job).await.unwrap();
821
822        let final_job = storage.get(&job_id).await.unwrap().unwrap();
823        assert!(matches!(final_job.state, JobState::Succeeded { .. }));
824    }
825}