1use crate::job_queue::{JobQueue, QueueError};
8use chrono::Utc;
9use std::sync::Arc;
10use std::time::Duration;
11use thiserror::Error;
12use tokio::task::JoinHandle;
13use uuid::Uuid;
14
15#[async_trait::async_trait]
33pub trait JobExecutor<J>: Send + Sync
34where
35 J: Send + Sync,
36{
37 async fn execute(&self, job: &J) -> Result<serde_json::Value, ExecutorError>;
46}
47
48#[async_trait::async_trait]
53pub trait JobStore<J>: Send + Sync
54where
55 J: Send + Sync,
56{
57 async fn claim_next_job(&self, worker_id: &str) -> Result<Option<J>, StoreError>;
69
70 async fn load_job(&self, job_id: Uuid) -> Result<J, StoreError>;
72
73 async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError>;
75
76 async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError>;
78
79 async fn mark_job_completed(&self, job_id: Uuid, results: serde_json::Value) -> Result<(), StoreError>;
81
82 async fn schedule_retry(
84 &self,
85 job_id: Uuid,
86 retry_count: i32,
87 next_retry_at: chrono::DateTime<Utc>,
88 error_message: &str,
89 error_details: serde_json::Value,
90 ) -> Result<(), StoreError>;
91
92 async fn mark_job_failed(
94 &self,
95 job_id: Uuid,
96 error_message: &str,
97 error_details: serde_json::Value,
98 ) -> Result<(), StoreError>;
99
100 async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError>;
104
105 async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<J>, StoreError>;
109
110 async fn count_claimable_jobs(&self) -> Result<i64, StoreError>;
112}
113
114#[derive(Debug, Clone)]
116pub struct WorkerPoolConfig {
117 pub worker_count: usize,
119
120 pub queue_key: String,
122
123 pub poll_timeout: u64,
125
126 pub max_retries: i32,
128
129 pub base_retry_delay: i64,
131
132 pub max_retry_delay: i64,
134
135 pub stale_job_threshold_seconds: i64,
137
138 pub stale_check_interval_seconds: u64,
140}
141
142impl Default for WorkerPoolConfig {
143 fn default() -> Self {
144 Self {
145 worker_count: 5,
146 queue_key: "jobs:queue".to_string(),
147 poll_timeout: 30,
148 max_retries: 3,
149 base_retry_delay: 60,
150 max_retry_delay: 3600,
151 stale_job_threshold_seconds: 7200,
152 stale_check_interval_seconds: 300,
153 }
154 }
155}
156
157pub struct WorkerPool<J, E, S>
159where
160 J: Send + Sync + 'static,
161 E: JobExecutor<J> + 'static,
162 S: JobStore<J> + 'static,
163{
164 config: WorkerPoolConfig,
165 redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
166 executor: Arc<E>,
167 store: Arc<S>,
168 workers: Vec<JoinHandle<()>>,
169 _phantom: std::marker::PhantomData<J>,
170}
171
172impl<J, E, S> WorkerPool<J, E, S>
173where
174 J: Send + Sync + 'static + crate::job::Job,
175 E: JobExecutor<J> + 'static,
176 S: JobStore<J> + 'static,
177{
178 pub fn new(
180 config: WorkerPoolConfig,
181 redis_queue: JobQueue,
182 executor: E,
183 store: S,
184 ) -> Self {
185 Self {
186 config,
187 redis_queue: Arc::new(tokio::sync::Mutex::new(redis_queue)),
188 executor: Arc::new(executor),
189 store: Arc::new(store),
190 workers: Vec::new(),
191 _phantom: std::marker::PhantomData,
192 }
193 }
194
195 pub async fn start(&mut self) -> Result<(), WorkerError> {
200 tracing::info!(
201 "🚀 [WORKER_POOL] Starting worker pool with {} workers + 1 stale job checker",
202 self.config.worker_count
203 );
204
205 self.recover_orphaned_jobs().await?;
207
208 for worker_id in 0..self.config.worker_count {
210 let worker = Worker::new(
211 worker_id,
212 self.config.clone(),
213 self.redis_queue.clone(),
214 self.executor.clone(),
215 self.store.clone(),
216 );
217
218 let handle = tokio::spawn(async move {
219 if let Err(e) = worker.run().await {
220 tracing::error!("❌ [WORKER_{}] Worker failed: {}", worker_id, e);
221 }
222 });
223
224 self.workers.push(handle);
225 }
226
227 let checker = StaleJobChecker::new(
229 self.config.clone(),
230 self.redis_queue.clone(),
231 self.store.clone(),
232 );
233
234 let handle = tokio::spawn(async move {
235 if let Err(e) = checker.run().await {
236 tracing::error!("❌ [STALE_CHECKER] Stale job checker failed: {}", e);
237 }
238 });
239
240 self.workers.push(handle);
241
242 tracing::info!(
243 "✅ [WORKER_POOL] All workers started successfully - stale_threshold: {}s",
244 self.config.stale_job_threshold_seconds
245 );
246
247 Ok(())
248 }
249
250 async fn recover_orphaned_jobs(&mut self) -> Result<(), WorkerError> {
252 tracing::info!("🔍 [RECOVERY] Scanning database for orphaned jobs to recover...");
253
254 let orphaned_jobs = self
255 .store
256 .find_orphaned_jobs()
257 .await
258 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
259
260 if orphaned_jobs.is_empty() {
261 tracing::info!("✅ [RECOVERY] No orphaned jobs found - queue is clean");
262 return Ok(());
263 }
264
265 tracing::info!(
266 "📋 [RECOVERY] Found {} orphaned jobs to recover",
267 orphaned_jobs.len()
268 );
269
270 let mut recovered = 0;
271 let mut failed_recovery = 0;
272
273 for (job_id, job_type, retry_count, max_retries) in orphaned_jobs {
274 tracing::debug!(
275 "🔄 [RECOVERY] Processing job {} (type: {}, retry: {}/{})",
276 job_id,
277 job_type,
278 retry_count,
279 max_retries
280 );
281
282 match self.redis_queue.lock().await.enqueue(job_id, job_type.clone()).await {
283 Ok(_) => {
284 recovered += 1;
285 tracing::info!(
286 "✅ [RECOVERY] Re-enqueued job {} to Redis",
287 job_id
288 );
289 }
290 Err(e) => {
291 failed_recovery += 1;
292 tracing::error!(
293 "❌ [RECOVERY] Failed to re-enqueue job {}: {}",
294 job_id,
295 e
296 );
297 }
298 }
299 }
300
301 tracing::info!(
302 "✅ [RECOVERY] Recovery complete - recovered: {}, failed: {}",
303 recovered,
304 failed_recovery
305 );
306
307 Ok(())
308 }
309
310 pub async fn stop(&mut self) {
312 tracing::info!("Stopping worker pool");
313
314 for handle in self.workers.drain(..) {
315 handle.abort();
316 }
317 }
318
319 pub async fn wait(self) -> Result<(), WorkerError> {
321 for handle in self.workers {
322 handle
323 .await
324 .map_err(|e| WorkerError::WorkerFailed(e.to_string()))?;
325 }
326 Ok(())
327 }
328}
329
330struct Worker<J, E, S>
332where
333 J: Send + Sync,
334 E: JobExecutor<J>,
335 S: JobStore<J>,
336{
337 id: usize,
338 config: WorkerPoolConfig,
339 redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
340 executor: Arc<E>,
341 store: Arc<S>,
342 current_backoff_ms: u64,
343 max_backoff_ms: u64,
344 consecutive_lock_failures: u32,
345 _phantom: std::marker::PhantomData<J>,
346}
347
348impl<J, E, S> Worker<J, E, S>
349where
350 J: Send + Sync + crate::job::Job,
351 E: JobExecutor<J>,
352 S: JobStore<J>,
353{
354 fn new(
355 id: usize,
356 config: WorkerPoolConfig,
357 redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
358 executor: Arc<E>,
359 store: Arc<S>,
360 ) -> Self {
361 Self {
362 id,
363 config,
364 redis_queue,
365 executor,
366 store,
367 current_backoff_ms: 100,
368 max_backoff_ms: 60_000,
369 consecutive_lock_failures: 0,
370 _phantom: std::marker::PhantomData,
371 }
372 }
373
374 async fn run(mut self) -> Result<(), WorkerError> {
375 let worker_id = format!("worker_{}", self.id);
376 tracing::info!(
377 "🟢 [WORKER_{}] Started with ID '{}' - atomic database-driven job claiming",
378 self.id,
379 worker_id
380 );
381
382 loop {
383 match self.store.claim_next_job(&worker_id).await {
384 Ok(Some(job)) => {
385 let job_id = job.id();
386 let job_type_str = job.job_type();
387
388 tracing::info!(
389 "📦 [WORKER_{}] Atomically claimed job {} (type: {}, retry: {}/{})",
390 self.id,
391 job_id,
392 job_type_str,
393 job.retry_count(),
394 job.max_retries()
395 );
396
397 let lock_ttl_seconds = job
399 .configuration()
400 .and_then(|config| config.get("redis_lock_ttl_seconds"))
401 .and_then(|v| v.as_u64())
402 .unwrap_or(172800); let mut queue = self.redis_queue.lock().await;
406 let lock_acquired = match queue
407 .acquire_job_type_lock(&job_type_str, job.parameters(), lock_ttl_seconds)
408 .await
409 {
410 Ok(acquired) => acquired,
411 Err(e) => {
412 tracing::error!(
413 "❌ [WORKER_{}] Failed to check lock for job {}: {}",
414 self.id,
415 job_id,
416 e
417 );
418 true }
420 };
421 drop(queue);
422
423 if !lock_acquired {
424 self.consecutive_lock_failures += 1;
425 tracing::warn!(
426 "🔒 [WORKER_{}] Job {} skipped - lock held",
427 self.id,
428 job_id
429 );
430
431 tokio::time::sleep(Duration::from_millis(self.current_backoff_ms)).await;
433 self.current_backoff_ms = std::cmp::min(self.current_backoff_ms * 2, self.max_backoff_ms);
434 continue;
435 }
436
437 if self.consecutive_lock_failures > 0 {
439 self.consecutive_lock_failures = 0;
440 self.current_backoff_ms = 100;
441 }
442
443 if let Err(e) = self.process_job_with_retry(job).await {
445 tracing::error!(
446 "❌ [WORKER_{}] Failed to process job {}: {}",
447 self.id,
448 job_id,
449 e
450 );
451 }
452 }
453 Ok(None) => {
454 tracing::trace!(
456 "⏱️ [WORKER_{}] No claimable jobs - sleeping 5s",
457 self.id
458 );
459 tokio::time::sleep(Duration::from_secs(5)).await;
460
461 if let Err(e) = self.check_orphaned_jobs().await {
462 tracing::error!(
463 "❌ [WORKER_{}] Failed to check orphaned jobs: {}",
464 self.id,
465 e
466 );
467 }
468 }
469 Err(e) => {
470 tracing::error!("❌ [WORKER_{}] Failed to claim job: {}", self.id, e);
471 tokio::time::sleep(Duration::from_secs(1)).await;
472 }
473 }
474 }
475 }
476
477 async fn process_job_with_retry(&self, job: J) -> Result<(), WorkerError> {
478 let job_id = job.id();
479
480 tracing::info!(
481 "🎬 [WORKER_{}] Starting job {} (retry: {}/{})",
482 self.id,
483 job_id,
484 job.retry_count(),
485 job.max_retries()
486 );
487
488 self.store
490 .mark_job_running(job_id)
491 .await
492 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
493
494 match self.executor.execute(&job).await {
496 Ok(results) => {
497 self.store
499 .mark_job_completed(job_id, results)
500 .await
501 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
502
503 tracing::info!(
504 "✅ [WORKER_{}] Job {} completed successfully",
505 self.id,
506 job_id
507 );
508 }
509 Err(e) => {
510 let error_message = e.to_string();
512
513 if job.can_retry() {
514 let next_retry_at = job.calculate_next_retry_at();
516 let error_details = self.build_error_details(&error_message, "EXECUTION_FAILED", &job);
517
518 self.store
519 .schedule_retry(
520 job_id,
521 job.retry_count() + 1,
522 next_retry_at,
523 &error_message,
524 error_details,
525 )
526 .await
527 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
528
529 tracing::warn!(
530 "⚠️ [WORKER_{}] Job {} failed (attempt {}/{}), will retry at {}",
531 self.id,
532 job_id,
533 job.retry_count() + 1,
534 job.max_retries(),
535 next_retry_at
536 );
537 } else {
538 let error_details = self.build_error_details(
540 &error_message,
541 "MAX_RETRIES_EXCEEDED",
542 &job,
543 );
544
545 self.store
546 .mark_job_failed(job_id, &error_message, error_details)
547 .await
548 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
549
550 tracing::error!(
551 "❌ [WORKER_{}] Job {} PERMANENTLY FAILED after {} attempts",
552 self.id,
553 job_id,
554 job.retry_count()
555 );
556 }
557 }
558 }
559
560 Ok(())
561 }
562
563 fn build_error_details(&self, error_message: &str, error_code: &str, job: &J) -> serde_json::Value {
564 serde_json::json!({
565 "error_code": error_code,
566 "error_message": error_message,
567 "worker_id": self.id,
568 "job_type": job.job_type(),
569 "retry_count": job.retry_count(),
570 "max_retries": job.max_retries(),
571 "timestamp": Utc::now().to_rfc3339(),
572 })
573 }
574
575 async fn check_orphaned_jobs(&self) -> Result<(), WorkerError> {
576 let count = self
577 .store
578 .count_claimable_jobs()
579 .await
580 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
581
582 if count > 0 {
583 tracing::info!(
584 "🔄 [WORKER_{}] Found {} claimable jobs",
585 self.id,
586 count
587 );
588 }
589
590 Ok(())
591 }
592}
593
594struct StaleJobChecker<J, S>
596where
597 J: Send + Sync,
598 S: JobStore<J>,
599{
600 config: WorkerPoolConfig,
601 redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
602 store: Arc<S>,
603 _phantom: std::marker::PhantomData<J>,
604}
605
606impl<J, S> StaleJobChecker<J, S>
607where
608 J: Send + Sync + crate::job::Job,
609 S: JobStore<J>,
610{
611 fn new(
612 config: WorkerPoolConfig,
613 redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
614 store: Arc<S>,
615 ) -> Self {
616 Self {
617 config,
618 redis_queue,
619 store,
620 _phantom: std::marker::PhantomData,
621 }
622 }
623
624 async fn run(self) -> Result<(), WorkerError> {
625 tracing::info!(
626 "🔍 [STALE_CHECKER] Started - checking every {}s for jobs running longer than {}s",
627 self.config.stale_check_interval_seconds,
628 self.config.stale_job_threshold_seconds
629 );
630
631 let check_interval = Duration::from_secs(self.config.stale_check_interval_seconds);
632
633 loop {
634 tokio::time::sleep(check_interval).await;
635
636 if let Err(e) = self.check_and_recover_stale_jobs().await {
637 tracing::error!("❌ [STALE_CHECKER] Failed to check stale jobs: {}", e);
638 }
639 }
640 }
641
642 async fn check_and_recover_stale_jobs(&self) -> Result<(), WorkerError> {
643 let stale_jobs = self
644 .store
645 .find_stale_jobs(self.config.stale_job_threshold_seconds)
646 .await
647 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
648
649 if stale_jobs.is_empty() {
650 tracing::debug!("✓ [STALE_CHECKER] No stale jobs found");
651 return Ok(());
652 }
653
654 tracing::warn!(
655 "⚠️ [STALE_CHECKER] Found {} stale jobs - recovering them",
656 stale_jobs.len()
657 );
658
659 for job in stale_jobs {
660 let job_id = job.id();
661
662 if job.can_retry() {
663 let next_retry_at = job.calculate_next_retry_at();
665 let error_message = format!(
666 "Job was stuck in RUNNING state - likely due to worker crash"
667 );
668 let error_details = serde_json::json!({
669 "error_code": "STALE_JOB_RECOVERY",
670 "reason": "Job automatically recovered from RUNNING state",
671 "threshold_seconds": self.config.stale_job_threshold_seconds,
672 });
673
674 self.store
675 .schedule_retry(
676 job_id,
677 job.retry_count() + 1,
678 next_retry_at,
679 &error_message,
680 error_details,
681 )
682 .await
683 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
684
685 tracing::info!(
686 "✅ [STALE_CHECKER] Job {} scheduled for retry",
687 job_id
688 );
689 } else {
690 let error_message = format!(
692 "Job failed permanently: stuck in RUNNING state after {} retries",
693 job.retry_count()
694 );
695 let error_details = serde_json::json!({
696 "error_code": "STALE_JOB_MAX_RETRIES_EXCEEDED",
697 "reason": "Job stuck in RUNNING state and exhausted all retries",
698 });
699
700 self.store
701 .mark_job_failed(job_id, &error_message, error_details)
702 .await
703 .map_err(|e| WorkerError::StoreError(e.to_string()))?;
704
705 tracing::error!(
706 "🔴 [STALE_CHECKER] Job {} marked as PERMANENTLY FAILED",
707 job_id
708 );
709 }
710 }
711
712 Ok(())
713 }
714}
715
716#[derive(Error, Debug)]
718pub enum WorkerError {
719 #[error("Store error: {0}")]
720 StoreError(String),
721
722 #[error("Queue error: {0}")]
723 QueueError(String),
724
725 #[error("Worker failed: {0}")]
726 WorkerFailed(String),
727
728 #[error("Execution error: {0}")]
729 ExecutionError(String),
730}
731
732impl From<QueueError> for WorkerError {
733 fn from(err: QueueError) -> Self {
734 WorkerError::QueueError(err.to_string())
735 }
736}
737
738#[derive(Error, Debug)]
740pub enum StoreError {
741 #[error("Database error: {0}")]
742 DatabaseError(String),
743
744 #[error("Job not found: {0}")]
745 JobNotFound(Uuid),
746
747 #[error("Serialization error: {0}")]
748 SerializationError(String),
749}
750
751#[derive(Error, Debug)]
753pub enum ExecutorError {
754 #[error("Execution failed: {0}")]
755 ExecutionFailed(String),
756
757 #[error("Job was cancelled")]
758 JobCancelled,
759
760 #[error("Provider error: {0}")]
761 ProviderError(String),
762
763 #[error("Validation error: {0}")]
764 ValidationError(String),
765}
766
767#[cfg(test)]
768mod tests {
769 use super::*;
770
771 #[test]
772 fn test_worker_pool_config_defaults() {
773 let config = WorkerPoolConfig::default();
774
775 assert_eq!(config.worker_count, 5);
776 assert_eq!(config.queue_key, "jobs:queue");
777 assert_eq!(config.poll_timeout, 30);
778 assert_eq!(config.max_retries, 3);
779 assert_eq!(config.base_retry_delay, 60);
780 assert_eq!(config.max_retry_delay, 3600);
781 assert_eq!(config.stale_job_threshold_seconds, 7200);
782 assert_eq!(config.stale_check_interval_seconds, 300);
783 }
784}