1use chrono::{Duration, Utc};
7use std::sync::Arc;
8use tokio_util::sync::CancellationToken;
9use tracing::{debug, error, info, warn};
10
11use super::{
12 WorkerConfig, WorkerRegistry, WorkerResult,
13 cleanup::{DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL},
14 middleware::{self, JobMiddleware, TracingMiddleware},
15 retry::RetryPolicy,
16 worker::WorkerContext,
17};
18use crate::core::{Job, JobState};
19use crate::error::{QmlError, Result};
20use crate::storage::Storage;
21use crate::storage::prelude::*;
22
23fn default_middleware() -> Vec<Arc<dyn JobMiddleware>> {
27 vec![Arc::new(TracingMiddleware)]
28}
29
30pub type StateChangeHook = Arc<dyn Fn(&Job, &JobState, &JobState) + Send + Sync>;
46
47pub struct JobProcessor {
49 worker_registry: Arc<WorkerRegistry>,
50 storage: Arc<dyn Storage>,
51 retry_policy: RetryPolicy,
52 worker_config: WorkerConfig,
53 cancel_token: CancellationToken,
57 succeeded_ttl: Duration,
60 failed_ttl: Duration,
63 middleware: Vec<Arc<dyn JobMiddleware>>,
69 on_state_change: Option<StateChangeHook>,
74}
75
76impl JobProcessor {
77 pub fn new(
79 worker_registry: Arc<WorkerRegistry>,
80 storage: Arc<dyn Storage>,
81 worker_config: WorkerConfig,
82 ) -> Self {
83 Self {
84 worker_registry,
85 storage,
86 retry_policy: RetryPolicy::default(),
87 worker_config,
88 cancel_token: CancellationToken::new(),
89 succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
90 failed_ttl: DEFAULT_FAILED_TTL,
91 middleware: default_middleware(),
92 on_state_change: None,
93 }
94 }
95
96 pub fn with_retry_policy(
98 worker_registry: Arc<WorkerRegistry>,
99 storage: Arc<dyn Storage>,
100 worker_config: WorkerConfig,
101 retry_policy: RetryPolicy,
102 ) -> Self {
103 Self {
104 worker_registry,
105 storage,
106 retry_policy,
107 worker_config,
108 cancel_token: CancellationToken::new(),
109 succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
110 failed_ttl: DEFAULT_FAILED_TTL,
111 middleware: default_middleware(),
112 on_state_change: None,
113 }
114 }
115
116 pub fn with_cancellation(mut self, cancel_token: CancellationToken) -> Self {
121 self.cancel_token = cancel_token;
122 self
123 }
124
125 pub fn with_ttls(mut self, succeeded_ttl: Duration, failed_ttl: Duration) -> Self {
129 self.succeeded_ttl = succeeded_ttl;
130 self.failed_ttl = failed_ttl;
131 self
132 }
133
134 pub fn with_middleware(mut self, middleware: Vec<Arc<dyn JobMiddleware>>) -> Self {
142 self.middleware = middleware;
143 self
144 }
145
146 pub fn with_state_change_hook(mut self, hook: StateChangeHook) -> Self {
149 self.on_state_change = Some(hook);
150 self
151 }
152
153 fn apply_state_change(&self, job: &mut Job, new_state: JobState) -> Result<()> {
159 let prev_state = job.state.clone();
160 job.set_state(new_state)?;
161 self.fire_state_change_hook(job, &prev_state);
162 Ok(())
163 }
164
165 fn fire_state_change_hook(&self, job: &Job, prev_state: &JobState) {
169 if let Some(hook) = &self.on_state_change {
170 hook(job, prev_state, &job.state);
171 }
172 }
173
174 pub fn get_worker_id(&self) -> &str {
176 &self.worker_config.worker_id
177 }
178
179 pub async fn process_job(&self, mut job: Job) -> Result<()> {
181 let job_id = job.id.clone();
182 let method = job.method.clone();
183
184 info!("Starting job processing: {} ({})", job_id, method);
185
186 job.attempt = job.attempt.saturating_add(1);
190
191 let worker = match self.worker_registry.get_worker(&method) {
193 Some(worker) => worker,
194 None => {
195 error!("No worker found for method: {}", method);
196 return self
197 .fail_job_permanently(
198 &mut job,
199 format!("No worker registered for method: {}", method),
200 None,
201 )
202 .await;
203 }
204 };
205
206 if !matches!(job.state, JobState::Processing { .. }) {
208 let processing_state = JobState::processing(
209 &self.worker_config.worker_id,
210 &self.worker_config.server_name,
211 );
212
213 if let Err(e) = self.apply_state_change(&mut job, processing_state) {
214 error!("Failed to set job state to Processing: {}", e);
215 return Err(e);
216 }
217
218 if let Err(e) = self.storage.update(&job).await {
220 error!("Failed to update job state in storage: {}", e);
221 return Err(e.into());
222 }
223 }
224
225 let context = if job.attempt > 1 {
227 let previous_exception = self.extract_previous_exception(&job);
228 WorkerContext::retry_from(self.worker_config.clone(), job.attempt, previous_exception)
229 } else {
230 WorkerContext::new(self.worker_config.clone())
231 }
232 .with_cancel(self.cancel_token.clone());
233
234 let start_time = Utc::now();
239 let execution_result = match tokio::time::timeout(
240 self.worker_config.job_timeout.to_std().unwrap(),
241 middleware::run_stack(&self.middleware, worker, &job, &context),
242 )
243 .await
244 {
245 Ok(result) => result,
246 Err(_) => {
247 warn!(
248 "Job {} timed out after {:?}",
249 job_id, self.worker_config.job_timeout
250 );
251 return self.handle_job_timeout(&mut job).await;
252 }
253 };
254
255 let duration = (Utc::now() - start_time).num_milliseconds() as u64;
256
257 match execution_result {
259 Ok(WorkerResult::Success {
260 result, metadata, ..
261 }) => {
262 info!("Job {} completed successfully in {}ms", job_id, duration);
263 self.complete_job_successfully(&mut job, result, duration, metadata)
264 .await
265 }
266 Ok(WorkerResult::Retry {
267 error, retry_at, ..
268 }) => {
269 warn!("Job {} failed and will be retried: {}", job_id, error);
270 self.handle_job_retry(&mut job, error, retry_at).await
271 }
272 Ok(WorkerResult::Failure {
273 error, context: _, ..
274 }) => {
275 error!("Job {} failed permanently: {}", job_id, error);
276 self.fail_job_permanently(&mut job, error, None).await
277 }
278 Err(e) => {
279 error!("Job {} execution error: {}", job_id, e);
280 self.handle_execution_error(&mut job, e).await
281 }
282 }
283 }
284
285 async fn complete_job_successfully(
287 &self,
288 job: &mut Job,
289 result: Option<String>,
290 duration_ms: u64,
291 metadata: std::collections::HashMap<String, String>,
292 ) -> Result<()> {
293 if job.state.is_final() {
295 debug!(
296 "Job {} is already in a final state, skipping success",
297 job.id
298 );
299 return Ok(());
300 }
301
302 let succeeded_state = JobState::succeeded(duration_ms, result);
303
304 if let Err(e) = self.apply_state_change(job, succeeded_state) {
305 error!("Failed to set job state to Succeeded: {}", e);
306 return Err(e);
307 }
308
309 job.expires_at = Some(Utc::now() + self.succeeded_ttl);
312
313 for (key, value) in metadata {
315 job.add_metadata(format!("exec_{}", key), value);
316 }
317
318 self.storage.update(job).await?;
320
321 Ok(())
322 }
323
324 async fn handle_job_retry(
326 &self,
327 job: &mut Job,
328 error: String,
329 retry_at: Option<chrono::DateTime<Utc>>,
330 ) -> Result<()> {
331 if job.state.is_final() {
333 debug!("Job {} is already in a final state, skipping retry", job.id);
334 return Ok(());
335 }
336
337 if !self.should_retry_attempt(job, None) {
339 debug!(
340 "Retry limit exceeded for job {}, failing permanently",
341 job.id
342 );
343 return self.fail_job_permanently(job, error, None).await;
344 }
345
346 let pre_retry_state = job.state.clone();
351
352 let retry_time = retry_at
356 .or_else(|| self.retry_policy.calculate_retry_time(job.attempt))
357 .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
358
359 let retry_state = JobState::awaiting_retry(retry_time, &error);
365 if let Err(e) = job.set_state(retry_state) {
366 error!("Failed to set job state to AwaitingRetry: {}", e);
367 return Err(e);
368 }
369
370 self.fire_state_change_hook(job, &pre_retry_state);
373
374 self.storage.update(job).await?;
376
377 info!(
378 "Job {} scheduled for retry (attempt #{}) at {}",
379 job.id, job.attempt, retry_time
380 );
381 Ok(())
382 }
383
384 async fn fail_job_permanently(
386 &self,
387 job: &mut Job,
388 error: String,
389 stack_trace: Option<String>,
390 ) -> Result<()> {
391 if job.state.is_final() {
393 debug!(
394 "Job {} is already in a final state, skipping failure",
395 job.id
396 );
397 return Ok(());
398 }
399
400 let failed_state = JobState::failed(error, stack_trace);
401
402 if let Err(e) = self.apply_state_change(job, failed_state) {
403 error!("Failed to set job state to Failed: {}", e);
404 return Err(e);
405 }
406
407 job.expires_at = Some(Utc::now() + self.failed_ttl);
410
411 self.storage.update(job).await?;
413
414 error!(
415 "Job {} failed permanently after {} attempts",
416 job.id, job.attempt
417 );
418 Ok(())
419 }
420
421 async fn handle_job_timeout(&self, job: &mut Job) -> Result<()> {
423 let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
424
425 if self.should_retry_attempt(job, Some("TimeoutError")) {
426 self.handle_job_retry(job, timeout_error, None).await
427 } else {
428 self.fail_job_permanently(job, timeout_error, None).await
429 }
430 }
431
432 async fn handle_execution_error(&self, job: &mut Job, error: QmlError) -> Result<()> {
434 let error_type = match &error {
435 QmlError::StorageError { .. } => "StorageError",
436 QmlError::WorkerError { .. } => "WorkerError",
437 QmlError::TimeoutError { .. } => "TimeoutError",
438 _ => "UnknownError",
439 };
440
441 let error_message = error.to_string();
442
443 if self.should_retry_attempt(job, Some(error_type)) {
444 self.handle_job_retry(job, error_message, None).await
445 } else {
446 self.fail_job_permanently(job, error_message, None).await
447 }
448 }
449
450 fn should_retry_attempt(&self, job: &Job, exception_type: Option<&str>) -> bool {
457 if job.max_retries > 0 && job.attempt > job.max_retries {
458 return false;
459 }
460
461 self.retry_policy.should_retry(exception_type, job.attempt)
462 }
463
464 fn extract_previous_exception(&self, job: &Job) -> Option<String> {
466 match &job.state {
467 JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
468 JobState::Failed { exception, .. } => Some(exception.clone()),
469 _ => None,
470 }
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use crate::processing::{RetryStrategy, Worker};
478 use crate::storage::{MemoryStorage, MonitoringApi};
479 use async_trait::async_trait;
480 use chrono::Duration;
481 use std::sync::Arc;
482
483 struct TestWorker {
484 method: String,
485 should_succeed: bool,
486 should_retry: bool,
487 }
488
489 impl TestWorker {
490 fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
491 Self {
492 method: method.to_string(),
493 should_succeed,
494 should_retry,
495 }
496 }
497 }
498
499 #[async_trait]
500 impl Worker for TestWorker {
501 async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
502 if self.should_succeed {
503 Ok(WorkerResult::success(Some("Test result".to_string()), 100))
504 } else if self.should_retry {
505 Ok(WorkerResult::retry("Test error".to_string(), None))
506 } else {
507 Ok(WorkerResult::failure("Permanent failure".to_string()))
508 }
509 }
510
511 fn method_name(&self) -> &str {
512 &self.method
513 }
514 }
515
516 #[tokio::test]
517 async fn test_successful_job_processing() {
518 let storage = Arc::new(MemoryStorage::new());
519 let mut registry = WorkerRegistry::new();
520 registry.register(TestWorker::new("test_method", true, false));
521 let registry = Arc::new(registry);
522
523 let config = WorkerConfig::new("test-worker");
524 let processor = JobProcessor::new(registry, storage.clone(), config);
525
526 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
527 let job_id = job.id.clone();
528
529 storage.enqueue(&job).await.unwrap();
531
532 processor.process_job(job).await.unwrap();
534
535 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
537 assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
538 }
539
540 #[tokio::test]
541 async fn test_job_retry() {
542 let storage = Arc::new(MemoryStorage::new());
543 let mut registry = WorkerRegistry::new();
544 registry.register(TestWorker::new("test_method", false, true));
545 let registry = Arc::new(registry);
546
547 let config = WorkerConfig::new("test-worker");
548 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
549 let processor =
550 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
551
552 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
553 let job_id = job.id.clone();
554
555 storage.enqueue(&job).await.unwrap();
557
558 processor.process_job(job).await.unwrap();
560
561 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
563 assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
564 }
565
566 #[tokio::test]
567 async fn test_job_permanent_failure() {
568 let storage = Arc::new(MemoryStorage::new());
569 let mut registry = WorkerRegistry::new();
570 registry.register(TestWorker::new("test_method", false, false));
571 let registry = Arc::new(registry);
572
573 let config = WorkerConfig::new("test-worker");
574 let processor = JobProcessor::new(registry, storage.clone(), config);
575
576 let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
577 let job_id = job.id.clone();
578
579 storage.enqueue(&job).await.unwrap();
581
582 processor.process_job(job).await.unwrap();
584
585 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
587 assert!(matches!(updated_job.state, JobState::Failed { .. }));
588 }
589
590 #[tokio::test]
591 async fn test_job_respects_retry_limit() {
592 let storage = Arc::new(MemoryStorage::new());
593 let mut registry = WorkerRegistry::new();
594 registry.register(TestWorker::new("limited_retry_method", false, true));
595 let registry = Arc::new(registry);
596
597 let config = WorkerConfig::new("test-worker");
598 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 1));
599 let processor =
600 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
601
602 let job = Job::new("limited_retry_method", serde_json::Value::Null);
603 let job_id = job.id.clone();
604 storage.enqueue(&job).await.unwrap();
605
606 processor.process_job(job.clone()).await.unwrap();
608
609 let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
610 assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
611 assert_eq!(retry_job.attempt, 1);
612
613 retry_job
615 .set_state(JobState::enqueued(&retry_job.queue))
616 .unwrap();
617 storage.update(&retry_job).await.unwrap();
618
619 processor.process_job(retry_job).await.unwrap();
621
622 let final_job = storage.get(&job_id).await.unwrap().unwrap();
623 assert!(matches!(final_job.state, JobState::Failed { .. }));
624 assert_eq!(final_job.attempt, 2);
625 }
626
627 #[tokio::test]
628 async fn test_job_respects_job_specific_max_retries() {
629 let storage = Arc::new(MemoryStorage::new());
630 let mut registry = WorkerRegistry::new();
631 registry.register(TestWorker::new("job_specific_limit", false, true));
632 let registry = Arc::new(registry);
633
634 let config = WorkerConfig::new("test-worker");
635 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 5));
637 let processor =
638 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
639
640 let job = Job::with_config(
641 "job_specific_limit",
642 serde_json::Value::Null,
643 "default",
644 0,
645 1,
646 );
647 let job_id = job.id.clone();
648 storage.enqueue(&job).await.unwrap();
649
650 processor.process_job(job.clone()).await.unwrap();
652
653 let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
654 assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
655 assert_eq!(retry_job.attempt, 1);
656
657 retry_job
658 .set_state(JobState::enqueued(&retry_job.queue))
659 .unwrap();
660 storage.update(&retry_job).await.unwrap();
661
662 processor.process_job(retry_job).await.unwrap();
663
664 let final_job = storage.get(&job_id).await.unwrap().unwrap();
665 assert!(matches!(final_job.state, JobState::Failed { .. }));
666 assert_eq!(final_job.attempt, 2);
667 }
668
669 #[tokio::test]
670 async fn failed_to_enqueued_to_failed_increments_attempt() {
671 let storage = Arc::new(MemoryStorage::new());
676 let mut registry = WorkerRegistry::new();
677 registry.register(TestWorker::new("manual_retry_method", false, false));
678 let registry = Arc::new(registry);
679
680 let config = WorkerConfig::new("test-worker");
681 let processor = JobProcessor::new(registry, storage.clone(), config);
682
683 let job = Job::new("manual_retry_method", serde_json::Value::Null);
684 let job_id = job.id.clone();
685 storage.enqueue(&job).await.unwrap();
686
687 processor.process_job(job).await.unwrap();
690 let after_first = storage.get(&job_id).await.unwrap().unwrap();
691 assert!(matches!(after_first.state, JobState::Failed { .. }));
692 assert_eq!(after_first.attempt, 1);
693
694 let mut manual = after_first;
696 manual.set_state(JobState::enqueued(&manual.queue)).unwrap();
697 storage.update(&manual).await.unwrap();
698
699 processor.process_job(manual).await.unwrap();
701 let after_second = storage.get(&job_id).await.unwrap().unwrap();
702 assert!(matches!(after_second.state, JobState::Failed { .. }));
703 assert_eq!(after_second.attempt, 2);
704 }
705
706 use crate::core::JobStateKind;
709 use std::sync::Mutex;
710
711 fn install_recording_hook(
715 processor: JobProcessor,
716 ) -> (JobProcessor, Arc<Mutex<Vec<(JobStateKind, JobStateKind)>>>) {
717 let transitions = Arc::new(Mutex::new(Vec::<(JobStateKind, JobStateKind)>::new()));
718 let captured = transitions.clone();
719 let hook: StateChangeHook = Arc::new(move |_job, prev, new| {
720 captured.lock().unwrap().push((prev.kind(), new.kind()));
721 });
722 (processor.with_state_change_hook(hook), transitions)
723 }
724
725 #[tokio::test]
726 async fn state_change_hook_fires_for_successful_path() {
727 let storage = Arc::new(MemoryStorage::new());
728 let mut registry = WorkerRegistry::new();
729 registry.register(TestWorker::new("hook_success", true, false));
730 let registry = Arc::new(registry);
731
732 let config = WorkerConfig::new("hook-worker");
733 let processor = JobProcessor::new(registry, storage.clone(), config);
734 let (processor, transitions) = install_recording_hook(processor);
735
736 let job = Job::new("hook_success", serde_json::Value::Null);
737 storage.enqueue(&job).await.unwrap();
738 processor.process_job(job).await.unwrap();
739
740 let transitions = transitions.lock().unwrap();
741 assert_eq!(
742 *transitions,
743 vec![
744 (JobStateKind::Enqueued, JobStateKind::Processing),
745 (JobStateKind::Processing, JobStateKind::Succeeded),
746 ]
747 );
748 }
749
750 #[tokio::test]
751 async fn state_change_hook_skips_intermediate_failed_in_retry_path() {
752 let storage = Arc::new(MemoryStorage::new());
753 let mut registry = WorkerRegistry::new();
754 registry.register(TestWorker::new("hook_retry", false, true));
755 let registry = Arc::new(registry);
756
757 let config = WorkerConfig::new("hook-worker");
758 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 3));
759 let processor =
760 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
761 let (processor, transitions) = install_recording_hook(processor);
762
763 let job = Job::new("hook_retry", serde_json::Value::Null);
764 storage.enqueue(&job).await.unwrap();
765 processor.process_job(job).await.unwrap();
766
767 let transitions = transitions.lock().unwrap();
771 assert_eq!(
772 *transitions,
773 vec![
774 (JobStateKind::Enqueued, JobStateKind::Processing),
775 (JobStateKind::Processing, JobStateKind::AwaitingRetry),
776 ]
777 );
778 }
779
780 #[tokio::test]
781 async fn state_change_hook_fires_for_permanent_failure() {
782 let storage = Arc::new(MemoryStorage::new());
783 let mut registry = WorkerRegistry::new();
784 registry.register(TestWorker::new("hook_fail", false, false));
785 let registry = Arc::new(registry);
786
787 let config = WorkerConfig::new("hook-worker");
788 let processor = JobProcessor::new(registry, storage.clone(), config);
789 let (processor, transitions) = install_recording_hook(processor);
790
791 let job = Job::new("hook_fail", serde_json::Value::Null);
792 storage.enqueue(&job).await.unwrap();
793 processor.process_job(job).await.unwrap();
794
795 let transitions = transitions.lock().unwrap();
796 assert_eq!(
797 *transitions,
798 vec![
799 (JobStateKind::Enqueued, JobStateKind::Processing),
800 (JobStateKind::Processing, JobStateKind::Failed),
801 ]
802 );
803 }
804
805 #[tokio::test]
806 async fn state_change_hook_is_opt_in_default_is_no_op() {
807 let storage = Arc::new(MemoryStorage::new());
810 let mut registry = WorkerRegistry::new();
811 registry.register(TestWorker::new("no_hook", true, false));
812 let registry = Arc::new(registry);
813
814 let config = WorkerConfig::new("hook-worker");
815 let processor = JobProcessor::new(registry, storage.clone(), config);
816
817 let job = Job::new("no_hook", serde_json::Value::Null);
818 let job_id = job.id.clone();
819 storage.enqueue(&job).await.unwrap();
820 processor.process_job(job).await.unwrap();
821
822 let final_job = storage.get(&job_id).await.unwrap().unwrap();
823 assert!(matches!(final_job.state, JobState::Succeeded { .. }));
824 }
825}