Skip to main content

forge_runtime/jobs/
worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::mpsc;
5use uuid::Uuid;
6
7use super::executor::JobExecutor;
8use super::queue::JobQueue;
9use super::registry::JobRegistry;
10
11/// Worker configuration.
12#[derive(Debug, Clone)]
13pub struct WorkerConfig {
14    /// Worker ID (auto-generated if not provided).
15    pub id: Option<Uuid>,
16    /// Worker capabilities (e.g., ["general", "media"]).
17    pub capabilities: Vec<String>,
18    /// Maximum concurrent jobs.
19    pub max_concurrent: usize,
20    /// Poll interval when queue is empty.
21    pub poll_interval: Duration,
22    /// Batch size for claiming jobs.
23    pub batch_size: i32,
24    /// Stale job cleanup interval.
25    pub stale_cleanup_interval: Duration,
26    /// Stale job threshold.
27    pub stale_threshold: chrono::Duration,
28}
29
30impl Default for WorkerConfig {
31    fn default() -> Self {
32        Self {
33            id: None,
34            capabilities: vec!["general".to_string()],
35            max_concurrent: 10,
36            poll_interval: Duration::from_millis(100),
37            batch_size: 10,
38            stale_cleanup_interval: Duration::from_secs(60),
39            stale_threshold: chrono::Duration::minutes(5),
40        }
41    }
42}
43
44/// Background job worker.
45pub struct Worker {
46    id: Uuid,
47    config: WorkerConfig,
48    queue: JobQueue,
49    executor: Arc<JobExecutor>,
50    shutdown_tx: Option<mpsc::Sender<()>>,
51}
52
53impl Worker {
54    /// Create a new worker.
55    pub fn new(
56        config: WorkerConfig,
57        queue: JobQueue,
58        registry: JobRegistry,
59        db_pool: sqlx::PgPool,
60    ) -> Self {
61        let id = config.id.unwrap_or_else(Uuid::new_v4);
62        let executor = Arc::new(JobExecutor::new(queue.clone(), registry, db_pool));
63
64        Self {
65            id,
66            config,
67            queue,
68            executor,
69            shutdown_tx: None,
70        }
71    }
72
73    /// Get worker ID.
74    pub fn id(&self) -> Uuid {
75        self.id
76    }
77
78    /// Get worker capabilities.
79    pub fn capabilities(&self) -> &[String] {
80        &self.config.capabilities
81    }
82
83    /// Run the worker (blocks until shutdown).
84    pub async fn run(&mut self) -> Result<(), WorkerError> {
85        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
86        self.shutdown_tx = Some(shutdown_tx);
87
88        // Semaphore to limit concurrent jobs
89        let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
90
91        // Spawn stale and expired cleanup task
92        let cleanup_queue = self.queue.clone();
93        let cleanup_interval = self.config.stale_cleanup_interval;
94        let stale_threshold = self.config.stale_threshold;
95        tokio::spawn(async move {
96            loop {
97                tokio::time::sleep(cleanup_interval).await;
98
99                // Release stale jobs back to pending
100                if let Err(e) = cleanup_queue.release_stale(stale_threshold).await {
101                    tracing::error!("Failed to cleanup stale jobs: {}", e);
102                }
103
104                // Delete expired job records
105                match cleanup_queue.cleanup_expired().await {
106                    Ok(count) if count > 0 => {
107                        tracing::debug!("Cleaned up {} expired job records", count);
108                    }
109                    Err(e) => {
110                        tracing::error!("Failed to cleanup expired jobs: {}", e);
111                    }
112                    _ => {}
113                }
114            }
115        });
116
117        tracing::info!(
118            worker_id = %self.id,
119            capabilities = ?self.config.capabilities,
120            "Worker started"
121        );
122
123        loop {
124            tokio::select! {
125                _ = shutdown_rx.recv() => {
126                    tracing::info!(worker_id = %self.id, "Worker shutting down");
127                    break;
128                }
129                _ = tokio::time::sleep(self.config.poll_interval) => {
130                    // Calculate how many jobs we can claim
131                    let available = semaphore.available_permits();
132                    if available == 0 {
133                        continue;
134                    }
135
136                    let batch_size = (available as i32).min(self.config.batch_size);
137
138                    // Claim jobs
139                    let jobs = match self.queue.claim(
140                        self.id,
141                        &self.config.capabilities,
142                        batch_size,
143                    ).await {
144                        Ok(jobs) => jobs,
145                        Err(e) => {
146                            tracing::error!("Failed to claim jobs: {}", e);
147                            continue;
148                        }
149                    };
150
151                    // Process each job
152                    for job in jobs {
153                        let permit = semaphore.clone().acquire_owned().await.unwrap();
154                        let executor = self.executor.clone();
155                        let job_id = job.id;
156                        let job_type = job.job_type.clone();
157
158                        tokio::spawn(async move {
159                            tracing::debug!(
160                                job_id = %job_id,
161                                job_type = %job_type,
162                                "Processing job"
163                            );
164
165                            let result = executor.execute(&job).await;
166
167                            match &result {
168                                super::executor::ExecutionResult::Completed { .. } => {
169                                    tracing::info!(
170                                        job_id = %job_id,
171                                        job_type = %job_type,
172                                        "Job completed"
173                                    );
174                                }
175                                super::executor::ExecutionResult::Failed { error, retryable } => {
176                                    if *retryable {
177                                        tracing::warn!(
178                                            job_id = %job_id,
179                                            job_type = %job_type,
180                                            error = %error,
181                                            "Job failed, will retry"
182                                        );
183                                    } else {
184                                        tracing::error!(
185                                            job_id = %job_id,
186                                            job_type = %job_type,
187                                            error = %error,
188                                            "Job failed permanently"
189                                        );
190                                    }
191                                }
192                                super::executor::ExecutionResult::TimedOut { retryable } => {
193                                    tracing::warn!(
194                                        job_id = %job_id,
195                                        job_type = %job_type,
196                                        will_retry = %retryable,
197                                        "Job timed out"
198                                    );
199                                }
200                                super::executor::ExecutionResult::Cancelled { reason } => {
201                                    tracing::info!(
202                                        job_id = %job_id,
203                                        job_type = %job_type,
204                                        reason = %reason,
205                                        "Job cancelled"
206                                    );
207                                }
208                            }
209
210                            drop(permit); // Release semaphore
211                        });
212                    }
213                }
214            }
215        }
216
217        Ok(())
218    }
219
220    /// Request graceful shutdown.
221    pub async fn shutdown(&self) {
222        if let Some(ref tx) = self.shutdown_tx {
223            let _ = tx.send(()).await;
224        }
225    }
226}
227
228/// Worker errors.
229#[derive(Debug, thiserror::Error)]
230pub enum WorkerError {
231    #[error("Database error: {0}")]
232    Database(String),
233
234    #[error("Job execution error: {0}")]
235    Execution(String),
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn test_worker_config_default() {
244        let config = WorkerConfig::default();
245        assert_eq!(config.capabilities, vec!["general".to_string()]);
246        assert_eq!(config.max_concurrent, 10);
247        assert_eq!(config.batch_size, 10);
248    }
249
250    #[test]
251    fn test_worker_config_custom() {
252        let config = WorkerConfig {
253            capabilities: vec!["media".to_string(), "general".to_string()],
254            max_concurrent: 4,
255            ..Default::default()
256        };
257        assert_eq!(config.capabilities.len(), 2);
258        assert_eq!(config.max_concurrent, 4);
259    }
260}