Skip to main content

pleme_redis/
worker_pool.rs

1//! Generic worker pool for processing Redis-backed job queues
2//!
3//! This module provides a generic worker pool that can process any job type
4//! that implements the `Job` trait. Services provide their own `JobExecutor`
5//! and `JobStore` implementations for business logic and persistence.
6
7use crate::job_queue::{JobQueue, QueueError};
8use chrono::Utc;
9use std::sync::Arc;
10use std::time::Duration;
11use thiserror::Error;
12use tokio::task::JoinHandle;
13use uuid::Uuid;
14
15/// Job executor trait - services implement this for business logic
16///
17/// # Example
18/// ```rust
19/// use pleme_redis::worker_pool::JobExecutor;
20/// use async_trait::async_trait;
21///
22/// struct MyJobExecutor;
23///
24/// #[async_trait]
25/// impl JobExecutor<MyJob> for MyJobExecutor {
26///     async fn execute(&self, job: &MyJob) -> Result<serde_json::Value, ExecutorError> {
27///         // Business logic here
28///         Ok(serde_json::json!({"status": "success"}))
29///     }
30/// }
31/// ```
32#[async_trait::async_trait]
33pub trait JobExecutor<J>: Send + Sync
34where
35    J: Send + Sync,
36{
37    /// Execute a job and return results
38    ///
39    /// # Arguments
40    /// * `job` - Job to execute
41    ///
42    /// # Returns
43    /// * `Ok(Value)` - Job results (serializable JSON)
44    /// * `Err(ExecutorError)` - Execution failed
45    async fn execute(&self, job: &J) -> Result<serde_json::Value, ExecutorError>;
46}
47
48/// Job store trait - services implement this for database persistence
49///
50/// This trait abstracts database operations so the worker pool can work
51/// with any persistence layer (PostgreSQL, MongoDB, etc.).
52#[async_trait::async_trait]
53pub trait JobStore<J>: Send + Sync
54where
55    J: Send + Sync,
56{
57    /// Atomically claim next available job
58    ///
59    /// Should use SELECT FOR UPDATE SKIP LOCKED or equivalent
60    /// to ensure only one worker claims a job.
61    ///
62    /// # Arguments
63    /// * `worker_id` - Worker identifier for tracking
64    ///
65    /// # Returns
66    /// * `Some(Job)` - Job successfully claimed
67    /// * `None` - No jobs available
68    async fn claim_next_job(&self, worker_id: &str) -> Result<Option<J>, StoreError>;
69
70    /// Load job by ID
71    async fn load_job(&self, job_id: Uuid) -> Result<J, StoreError>;
72
73    /// Check if job was soft-deleted (for cancellation support)
74    async fn is_job_deleted(&self, job_id: Uuid) -> Result<bool, StoreError>;
75
76    /// Mark job as running
77    async fn mark_job_running(&self, job_id: Uuid) -> Result<(), StoreError>;
78
79    /// Mark job as completed with results
80    async fn mark_job_completed(&self, job_id: Uuid, results: serde_json::Value) -> Result<(), StoreError>;
81
82    /// Schedule job retry with exponential backoff
83    async fn schedule_retry(
84        &self,
85        job_id: Uuid,
86        retry_count: i32,
87        next_retry_at: chrono::DateTime<Utc>,
88        error_message: &str,
89        error_details: serde_json::Value,
90    ) -> Result<(), StoreError>;
91
92    /// Mark job as permanently failed
93    async fn mark_job_failed(
94        &self,
95        job_id: Uuid,
96        error_message: &str,
97        error_details: serde_json::Value,
98    ) -> Result<(), StoreError>;
99
100    /// Find orphaned jobs (PENDING/QUEUED without worker)
101    ///
102    /// Used for startup recovery
103    async fn find_orphaned_jobs(&self) -> Result<Vec<(Uuid, String, i32, i32)>, StoreError>;
104
105    /// Find stale jobs (RUNNING without updates)
106    ///
107    /// Used by stale job checker
108    async fn find_stale_jobs(&self, threshold_seconds: i64) -> Result<Vec<J>, StoreError>;
109
110    /// Count claimable jobs (for monitoring)
111    async fn count_claimable_jobs(&self) -> Result<i64, StoreError>;
112}
113
114/// Worker pool configuration
115#[derive(Debug, Clone)]
116pub struct WorkerPoolConfig {
117    /// Number of concurrent workers
118    pub worker_count: usize,
119
120    /// Redis queue key
121    pub queue_key: String,
122
123    /// Worker poll timeout (seconds)
124    pub poll_timeout: u64,
125
126    /// Maximum retries per job (default: 3)
127    pub max_retries: i32,
128
129    /// Base delay for exponential backoff (seconds, default: 60)
130    pub base_retry_delay: i64,
131
132    /// Maximum delay for exponential backoff (seconds, default: 3600)
133    pub max_retry_delay: i64,
134
135    /// Threshold for detecting stale RUNNING jobs in seconds (default: 7200 = 2 hours)
136    pub stale_job_threshold_seconds: i64,
137
138    /// How often to check for stale jobs in seconds (default: 300 = 5 minutes)
139    pub stale_check_interval_seconds: u64,
140}
141
142impl Default for WorkerPoolConfig {
143    fn default() -> Self {
144        Self {
145            worker_count: 5,
146            queue_key: "jobs:queue".to_string(),
147            poll_timeout: 30,
148            max_retries: 3,
149            base_retry_delay: 60,
150            max_retry_delay: 3600,
151            stale_job_threshold_seconds: 7200,
152            stale_check_interval_seconds: 300,
153        }
154    }
155}
156
157/// Generic worker pool for processing jobs
158pub struct WorkerPool<J, E, S>
159where
160    J: Send + Sync + 'static,
161    E: JobExecutor<J> + 'static,
162    S: JobStore<J> + 'static,
163{
164    config: WorkerPoolConfig,
165    redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
166    executor: Arc<E>,
167    store: Arc<S>,
168    workers: Vec<JoinHandle<()>>,
169    _phantom: std::marker::PhantomData<J>,
170}
171
172impl<J, E, S> WorkerPool<J, E, S>
173where
174    J: Send + Sync + 'static + crate::job::Job,
175    E: JobExecutor<J> + 'static,
176    S: JobStore<J> + 'static,
177{
178    /// Create new worker pool
179    pub fn new(
180        config: WorkerPoolConfig,
181        redis_queue: JobQueue,
182        executor: E,
183        store: S,
184    ) -> Self {
185        Self {
186            config,
187            redis_queue: Arc::new(tokio::sync::Mutex::new(redis_queue)),
188            executor: Arc::new(executor),
189            store: Arc::new(store),
190            workers: Vec::new(),
191            _phantom: std::marker::PhantomData,
192        }
193    }
194
195    /// Start all workers
196    ///
197    /// Spawns `worker_count` concurrent workers that poll the queue and process jobs.
198    /// Also spawns a dedicated stale job checker that runs periodically.
199    pub async fn start(&mut self) -> Result<(), WorkerError> {
200        tracing::info!(
201            "🚀 [WORKER_POOL] Starting worker pool with {} workers + 1 stale job checker",
202            self.config.worker_count
203        );
204
205        // CRITICAL: Recover orphaned jobs from database before starting workers
206        self.recover_orphaned_jobs().await?;
207
208        // Spawn job processing workers
209        for worker_id in 0..self.config.worker_count {
210            let worker = Worker::new(
211                worker_id,
212                self.config.clone(),
213                self.redis_queue.clone(),
214                self.executor.clone(),
215                self.store.clone(),
216            );
217
218            let handle = tokio::spawn(async move {
219                if let Err(e) = worker.run().await {
220                    tracing::error!("❌ [WORKER_{}] Worker failed: {}", worker_id, e);
221                }
222            });
223
224            self.workers.push(handle);
225        }
226
227        // Spawn dedicated stale job checker
228        let checker = StaleJobChecker::new(
229            self.config.clone(),
230            self.redis_queue.clone(),
231            self.store.clone(),
232        );
233
234        let handle = tokio::spawn(async move {
235            if let Err(e) = checker.run().await {
236                tracing::error!("❌ [STALE_CHECKER] Stale job checker failed: {}", e);
237            }
238        });
239
240        self.workers.push(handle);
241
242        tracing::info!(
243            "✅ [WORKER_POOL] All workers started successfully - stale_threshold: {}s",
244            self.config.stale_job_threshold_seconds
245        );
246
247        Ok(())
248    }
249
250    /// Recover orphaned jobs from database and re-enqueue to Redis
251    async fn recover_orphaned_jobs(&mut self) -> Result<(), WorkerError> {
252        tracing::info!("🔍 [RECOVERY] Scanning database for orphaned jobs to recover...");
253
254        let orphaned_jobs = self
255            .store
256            .find_orphaned_jobs()
257            .await
258            .map_err(|e| WorkerError::StoreError(e.to_string()))?;
259
260        if orphaned_jobs.is_empty() {
261            tracing::info!("✅ [RECOVERY] No orphaned jobs found - queue is clean");
262            return Ok(());
263        }
264
265        tracing::info!(
266            "📋 [RECOVERY] Found {} orphaned jobs to recover",
267            orphaned_jobs.len()
268        );
269
270        let mut recovered = 0;
271        let mut failed_recovery = 0;
272
273        for (job_id, job_type, retry_count, max_retries) in orphaned_jobs {
274            tracing::debug!(
275                "🔄 [RECOVERY] Processing job {} (type: {}, retry: {}/{})",
276                job_id,
277                job_type,
278                retry_count,
279                max_retries
280            );
281
282            match self.redis_queue.lock().await.enqueue(job_id, job_type.clone()).await {
283                Ok(_) => {
284                    recovered += 1;
285                    tracing::info!(
286                        "✅ [RECOVERY] Re-enqueued job {} to Redis",
287                        job_id
288                    );
289                }
290                Err(e) => {
291                    failed_recovery += 1;
292                    tracing::error!(
293                        "❌ [RECOVERY] Failed to re-enqueue job {}: {}",
294                        job_id,
295                        e
296                    );
297                }
298            }
299        }
300
301        tracing::info!(
302            "✅ [RECOVERY] Recovery complete - recovered: {}, failed: {}",
303            recovered,
304            failed_recovery
305        );
306
307        Ok(())
308    }
309
310    /// Stop all workers
311    pub async fn stop(&mut self) {
312        tracing::info!("Stopping worker pool");
313
314        for handle in self.workers.drain(..) {
315            handle.abort();
316        }
317    }
318
319    /// Wait for all workers to complete
320    pub async fn wait(self) -> Result<(), WorkerError> {
321        for handle in self.workers {
322            handle
323                .await
324                .map_err(|e| WorkerError::WorkerFailed(e.to_string()))?;
325        }
326        Ok(())
327    }
328}
329
330/// Individual worker
331struct Worker<J, E, S>
332where
333    J: Send + Sync,
334    E: JobExecutor<J>,
335    S: JobStore<J>,
336{
337    id: usize,
338    config: WorkerPoolConfig,
339    redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
340    executor: Arc<E>,
341    store: Arc<S>,
342    current_backoff_ms: u64,
343    max_backoff_ms: u64,
344    consecutive_lock_failures: u32,
345    _phantom: std::marker::PhantomData<J>,
346}
347
348impl<J, E, S> Worker<J, E, S>
349where
350    J: Send + Sync + crate::job::Job,
351    E: JobExecutor<J>,
352    S: JobStore<J>,
353{
354    fn new(
355        id: usize,
356        config: WorkerPoolConfig,
357        redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
358        executor: Arc<E>,
359        store: Arc<S>,
360    ) -> Self {
361        Self {
362            id,
363            config,
364            redis_queue,
365            executor,
366            store,
367            current_backoff_ms: 100,
368            max_backoff_ms: 60_000,
369            consecutive_lock_failures: 0,
370            _phantom: std::marker::PhantomData,
371        }
372    }
373
374    async fn run(mut self) -> Result<(), WorkerError> {
375        let worker_id = format!("worker_{}", self.id);
376        tracing::info!(
377            "🟢 [WORKER_{}] Started with ID '{}' - atomic database-driven job claiming",
378            self.id,
379            worker_id
380        );
381
382        loop {
383            match self.store.claim_next_job(&worker_id).await {
384                Ok(Some(job)) => {
385                    let job_id = job.id();
386                    let job_type_str = job.job_type();
387
388                    tracing::info!(
389                        "📦 [WORKER_{}] Atomically claimed job {} (type: {}, retry: {}/{})",
390                        self.id,
391                        job_id,
392                        job_type_str,
393                        job.retry_count(),
394                        job.max_retries()
395                    );
396
397                    // Get lock TTL from configuration
398                    let lock_ttl_seconds = job
399                        .configuration()
400                        .and_then(|config| config.get("redis_lock_ttl_seconds"))
401                        .and_then(|v| v.as_u64())
402                        .unwrap_or(172800); // Default: 48 hours
403
404                    // Check distributed lock
405                    let mut queue = self.redis_queue.lock().await;
406                    let lock_acquired = match queue
407                        .acquire_job_type_lock(&job_type_str, job.parameters(), lock_ttl_seconds)
408                        .await
409                    {
410                        Ok(acquired) => acquired,
411                        Err(e) => {
412                            tracing::error!(
413                                "❌ [WORKER_{}] Failed to check lock for job {}: {}",
414                                self.id,
415                                job_id,
416                                e
417                            );
418                            true // Fail-open
419                        }
420                    };
421                    drop(queue);
422
423                    if !lock_acquired {
424                        self.consecutive_lock_failures += 1;
425                        tracing::warn!(
426                            "🔒 [WORKER_{}] Job {} skipped - lock held",
427                            self.id,
428                            job_id
429                        );
430
431                        // Apply exponential backoff
432                        tokio::time::sleep(Duration::from_millis(self.current_backoff_ms)).await;
433                        self.current_backoff_ms = std::cmp::min(self.current_backoff_ms * 2, self.max_backoff_ms);
434                        continue;
435                    }
436
437                    // Lock acquired - reset backoff
438                    if self.consecutive_lock_failures > 0 {
439                        self.consecutive_lock_failures = 0;
440                        self.current_backoff_ms = 100;
441                    }
442
443                    // Process job with retry logic
444                    if let Err(e) = self.process_job_with_retry(job).await {
445                        tracing::error!(
446                            "❌ [WORKER_{}] Failed to process job {}: {}",
447                            self.id,
448                            job_id,
449                            e
450                        );
451                    }
452                }
453                Ok(None) => {
454                    // No jobs available - sleep and check for orphaned jobs
455                    tracing::trace!(
456                        "⏱️  [WORKER_{}] No claimable jobs - sleeping 5s",
457                        self.id
458                    );
459                    tokio::time::sleep(Duration::from_secs(5)).await;
460
461                    if let Err(e) = self.check_orphaned_jobs().await {
462                        tracing::error!(
463                            "❌ [WORKER_{}] Failed to check orphaned jobs: {}",
464                            self.id,
465                            e
466                        );
467                    }
468                }
469                Err(e) => {
470                    tracing::error!("❌ [WORKER_{}] Failed to claim job: {}", self.id, e);
471                    tokio::time::sleep(Duration::from_secs(1)).await;
472                }
473            }
474        }
475    }
476
477    async fn process_job_with_retry(&self, job: J) -> Result<(), WorkerError> {
478        let job_id = job.id();
479
480        tracing::info!(
481            "🎬 [WORKER_{}] Starting job {} (retry: {}/{})",
482            self.id,
483            job_id,
484            job.retry_count(),
485            job.max_retries()
486        );
487
488        // Mark as running
489        self.store
490            .mark_job_running(job_id)
491            .await
492            .map_err(|e| WorkerError::StoreError(e.to_string()))?;
493
494        // Execute job
495        match self.executor.execute(&job).await {
496            Ok(results) => {
497                // Success
498                self.store
499                    .mark_job_completed(job_id, results)
500                    .await
501                    .map_err(|e| WorkerError::StoreError(e.to_string()))?;
502
503                tracing::info!(
504                    "✅ [WORKER_{}] Job {} completed successfully",
505                    self.id,
506                    job_id
507                );
508            }
509            Err(e) => {
510                // Execution failed
511                let error_message = e.to_string();
512
513                if job.can_retry() {
514                    // Schedule retry
515                    let next_retry_at = job.calculate_next_retry_at();
516                    let error_details = self.build_error_details(&error_message, "EXECUTION_FAILED", &job);
517
518                    self.store
519                        .schedule_retry(
520                            job_id,
521                            job.retry_count() + 1,
522                            next_retry_at,
523                            &error_message,
524                            error_details,
525                        )
526                        .await
527                        .map_err(|e| WorkerError::StoreError(e.to_string()))?;
528
529                    tracing::warn!(
530                        "⚠️  [WORKER_{}] Job {} failed (attempt {}/{}), will retry at {}",
531                        self.id,
532                        job_id,
533                        job.retry_count() + 1,
534                        job.max_retries(),
535                        next_retry_at
536                    );
537                } else {
538                    // Max retries exceeded
539                    let error_details = self.build_error_details(
540                        &error_message,
541                        "MAX_RETRIES_EXCEEDED",
542                        &job,
543                    );
544
545                    self.store
546                        .mark_job_failed(job_id, &error_message, error_details)
547                        .await
548                        .map_err(|e| WorkerError::StoreError(e.to_string()))?;
549
550                    tracing::error!(
551                        "❌ [WORKER_{}] Job {} PERMANENTLY FAILED after {} attempts",
552                        self.id,
553                        job_id,
554                        job.retry_count()
555                    );
556                }
557            }
558        }
559
560        Ok(())
561    }
562
563    fn build_error_details(&self, error_message: &str, error_code: &str, job: &J) -> serde_json::Value {
564        serde_json::json!({
565            "error_code": error_code,
566            "error_message": error_message,
567            "worker_id": self.id,
568            "job_type": job.job_type(),
569            "retry_count": job.retry_count(),
570            "max_retries": job.max_retries(),
571            "timestamp": Utc::now().to_rfc3339(),
572        })
573    }
574
575    async fn check_orphaned_jobs(&self) -> Result<(), WorkerError> {
576        let count = self
577            .store
578            .count_claimable_jobs()
579            .await
580            .map_err(|e| WorkerError::StoreError(e.to_string()))?;
581
582        if count > 0 {
583            tracing::info!(
584                "🔄 [WORKER_{}] Found {} claimable jobs",
585                self.id,
586                count
587            );
588        }
589
590        Ok(())
591    }
592}
593
594/// Dedicated stale job checker
595struct StaleJobChecker<J, S>
596where
597    J: Send + Sync,
598    S: JobStore<J>,
599{
600    config: WorkerPoolConfig,
601    redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
602    store: Arc<S>,
603    _phantom: std::marker::PhantomData<J>,
604}
605
606impl<J, S> StaleJobChecker<J, S>
607where
608    J: Send + Sync + crate::job::Job,
609    S: JobStore<J>,
610{
611    fn new(
612        config: WorkerPoolConfig,
613        redis_queue: Arc<tokio::sync::Mutex<JobQueue>>,
614        store: Arc<S>,
615    ) -> Self {
616        Self {
617            config,
618            redis_queue,
619            store,
620            _phantom: std::marker::PhantomData,
621        }
622    }
623
624    async fn run(self) -> Result<(), WorkerError> {
625        tracing::info!(
626            "🔍 [STALE_CHECKER] Started - checking every {}s for jobs running longer than {}s",
627            self.config.stale_check_interval_seconds,
628            self.config.stale_job_threshold_seconds
629        );
630
631        let check_interval = Duration::from_secs(self.config.stale_check_interval_seconds);
632
633        loop {
634            tokio::time::sleep(check_interval).await;
635
636            if let Err(e) = self.check_and_recover_stale_jobs().await {
637                tracing::error!("❌ [STALE_CHECKER] Failed to check stale jobs: {}", e);
638            }
639        }
640    }
641
642    async fn check_and_recover_stale_jobs(&self) -> Result<(), WorkerError> {
643        let stale_jobs = self
644            .store
645            .find_stale_jobs(self.config.stale_job_threshold_seconds)
646            .await
647            .map_err(|e| WorkerError::StoreError(e.to_string()))?;
648
649        if stale_jobs.is_empty() {
650            tracing::debug!("✓ [STALE_CHECKER] No stale jobs found");
651            return Ok(());
652        }
653
654        tracing::warn!(
655            "⚠️  [STALE_CHECKER] Found {} stale jobs - recovering them",
656            stale_jobs.len()
657        );
658
659        for job in stale_jobs {
660            let job_id = job.id();
661
662            if job.can_retry() {
663                // Schedule retry
664                let next_retry_at = job.calculate_next_retry_at();
665                let error_message = format!(
666                    "Job was stuck in RUNNING state - likely due to worker crash"
667                );
668                let error_details = serde_json::json!({
669                    "error_code": "STALE_JOB_RECOVERY",
670                    "reason": "Job automatically recovered from RUNNING state",
671                    "threshold_seconds": self.config.stale_job_threshold_seconds,
672                });
673
674                self.store
675                    .schedule_retry(
676                        job_id,
677                        job.retry_count() + 1,
678                        next_retry_at,
679                        &error_message,
680                        error_details,
681                    )
682                    .await
683                    .map_err(|e| WorkerError::StoreError(e.to_string()))?;
684
685                tracing::info!(
686                    "✅ [STALE_CHECKER] Job {} scheduled for retry",
687                    job_id
688                );
689            } else {
690                // Max retries exceeded
691                let error_message = format!(
692                    "Job failed permanently: stuck in RUNNING state after {} retries",
693                    job.retry_count()
694                );
695                let error_details = serde_json::json!({
696                    "error_code": "STALE_JOB_MAX_RETRIES_EXCEEDED",
697                    "reason": "Job stuck in RUNNING state and exhausted all retries",
698                });
699
700                self.store
701                    .mark_job_failed(job_id, &error_message, error_details)
702                    .await
703                    .map_err(|e| WorkerError::StoreError(e.to_string()))?;
704
705                tracing::error!(
706                    "🔴 [STALE_CHECKER] Job {} marked as PERMANENTLY FAILED",
707                    job_id
708                );
709            }
710        }
711
712        Ok(())
713    }
714}
715
716/// Worker errors
717#[derive(Error, Debug)]
718pub enum WorkerError {
719    #[error("Store error: {0}")]
720    StoreError(String),
721
722    #[error("Queue error: {0}")]
723    QueueError(String),
724
725    #[error("Worker failed: {0}")]
726    WorkerFailed(String),
727
728    #[error("Execution error: {0}")]
729    ExecutionError(String),
730}
731
732impl From<QueueError> for WorkerError {
733    fn from(err: QueueError) -> Self {
734        WorkerError::QueueError(err.to_string())
735    }
736}
737
738/// Store errors
739#[derive(Error, Debug)]
740pub enum StoreError {
741    #[error("Database error: {0}")]
742    DatabaseError(String),
743
744    #[error("Job not found: {0}")]
745    JobNotFound(Uuid),
746
747    #[error("Serialization error: {0}")]
748    SerializationError(String),
749}
750
751/// Executor errors
752#[derive(Error, Debug)]
753pub enum ExecutorError {
754    #[error("Execution failed: {0}")]
755    ExecutionFailed(String),
756
757    #[error("Job was cancelled")]
758    JobCancelled,
759
760    #[error("Provider error: {0}")]
761    ProviderError(String),
762
763    #[error("Validation error: {0}")]
764    ValidationError(String),
765}
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770
771    #[test]
772    fn test_worker_pool_config_defaults() {
773        let config = WorkerPoolConfig::default();
774
775        assert_eq!(config.worker_count, 5);
776        assert_eq!(config.queue_key, "jobs:queue");
777        assert_eq!(config.poll_timeout, 30);
778        assert_eq!(config.max_retries, 3);
779        assert_eq!(config.base_retry_delay, 60);
780        assert_eq!(config.max_retry_delay, 3600);
781        assert_eq!(config.stale_job_threshold_seconds, 7200);
782        assert_eq!(config.stale_check_interval_seconds, 300);
783    }
784}