forge_runtime/jobs/
worker.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::mpsc;
5use uuid::Uuid;
6
7use super::executor::JobExecutor;
8use super::queue::JobQueue;
9use super::registry::JobRegistry;
10
11#[derive(Debug, Clone)]
13pub struct WorkerConfig {
14 pub id: Option<Uuid>,
16 pub capabilities: Vec<String>,
18 pub max_concurrent: usize,
20 pub poll_interval: Duration,
22 pub batch_size: i32,
24 pub stale_cleanup_interval: Duration,
26 pub stale_threshold: chrono::Duration,
28}
29
30impl Default for WorkerConfig {
31 fn default() -> Self {
32 Self {
33 id: None,
34 capabilities: vec!["general".to_string()],
35 max_concurrent: 10,
36 poll_interval: Duration::from_millis(100),
37 batch_size: 10,
38 stale_cleanup_interval: Duration::from_secs(60),
39 stale_threshold: chrono::Duration::minutes(5),
40 }
41 }
42}
43
44pub struct Worker {
46 id: Uuid,
47 config: WorkerConfig,
48 queue: JobQueue,
49 executor: Arc<JobExecutor>,
50 shutdown_tx: Option<mpsc::Sender<()>>,
51}
52
53impl Worker {
54 pub fn new(
56 config: WorkerConfig,
57 queue: JobQueue,
58 registry: JobRegistry,
59 db_pool: sqlx::PgPool,
60 ) -> Self {
61 let id = config.id.unwrap_or_else(Uuid::new_v4);
62 let executor = Arc::new(JobExecutor::new(queue.clone(), registry, db_pool));
63
64 Self {
65 id,
66 config,
67 queue,
68 executor,
69 shutdown_tx: None,
70 }
71 }
72
73 pub fn id(&self) -> Uuid {
75 self.id
76 }
77
78 pub fn capabilities(&self) -> &[String] {
80 &self.config.capabilities
81 }
82
83 pub async fn run(&mut self) -> Result<(), WorkerError> {
85 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
86 self.shutdown_tx = Some(shutdown_tx);
87
88 let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
90
91 let cleanup_queue = self.queue.clone();
93 let cleanup_interval = self.config.stale_cleanup_interval;
94 let stale_threshold = self.config.stale_threshold;
95 tokio::spawn(async move {
96 loop {
97 tokio::time::sleep(cleanup_interval).await;
98
99 if let Err(e) = cleanup_queue.release_stale(stale_threshold).await {
101 tracing::error!("Failed to cleanup stale jobs: {}", e);
102 }
103
104 match cleanup_queue.cleanup_expired().await {
106 Ok(count) if count > 0 => {
107 tracing::debug!("Cleaned up {} expired job records", count);
108 }
109 Err(e) => {
110 tracing::error!("Failed to cleanup expired jobs: {}", e);
111 }
112 _ => {}
113 }
114 }
115 });
116
117 tracing::info!(
118 worker_id = %self.id,
119 capabilities = ?self.config.capabilities,
120 "Worker started"
121 );
122
123 loop {
124 tokio::select! {
125 _ = shutdown_rx.recv() => {
126 tracing::info!(worker_id = %self.id, "Worker shutting down");
127 break;
128 }
129 _ = tokio::time::sleep(self.config.poll_interval) => {
130 let available = semaphore.available_permits();
132 if available == 0 {
133 continue;
134 }
135
136 let batch_size = (available as i32).min(self.config.batch_size);
137
138 let jobs = match self.queue.claim(
140 self.id,
141 &self.config.capabilities,
142 batch_size,
143 ).await {
144 Ok(jobs) => jobs,
145 Err(e) => {
146 tracing::error!("Failed to claim jobs: {}", e);
147 continue;
148 }
149 };
150
151 for job in jobs {
153 let permit = semaphore.clone().acquire_owned().await.unwrap();
154 let executor = self.executor.clone();
155 let job_id = job.id;
156 let job_type = job.job_type.clone();
157
158 tokio::spawn(async move {
159 tracing::debug!(
160 job_id = %job_id,
161 job_type = %job_type,
162 "Processing job"
163 );
164
165 let result = executor.execute(&job).await;
166
167 match &result {
168 super::executor::ExecutionResult::Completed { .. } => {
169 tracing::info!(
170 job_id = %job_id,
171 job_type = %job_type,
172 "Job completed"
173 );
174 }
175 super::executor::ExecutionResult::Failed { error, retryable } => {
176 if *retryable {
177 tracing::warn!(
178 job_id = %job_id,
179 job_type = %job_type,
180 error = %error,
181 "Job failed, will retry"
182 );
183 } else {
184 tracing::error!(
185 job_id = %job_id,
186 job_type = %job_type,
187 error = %error,
188 "Job failed permanently"
189 );
190 }
191 }
192 super::executor::ExecutionResult::TimedOut { retryable } => {
193 tracing::warn!(
194 job_id = %job_id,
195 job_type = %job_type,
196 will_retry = %retryable,
197 "Job timed out"
198 );
199 }
200 super::executor::ExecutionResult::Cancelled { reason } => {
201 tracing::info!(
202 job_id = %job_id,
203 job_type = %job_type,
204 reason = %reason,
205 "Job cancelled"
206 );
207 }
208 }
209
210 drop(permit); });
212 }
213 }
214 }
215 }
216
217 Ok(())
218 }
219
220 pub async fn shutdown(&self) {
222 if let Some(ref tx) = self.shutdown_tx {
223 let _ = tx.send(()).await;
224 }
225 }
226}
227
228#[derive(Debug, thiserror::Error)]
230pub enum WorkerError {
231 #[error("Database error: {0}")]
232 Database(String),
233
234 #[error("Job execution error: {0}")]
235 Execution(String),
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241
242 #[test]
243 fn test_worker_config_default() {
244 let config = WorkerConfig::default();
245 assert_eq!(config.capabilities, vec!["general".to_string()]);
246 assert_eq!(config.max_concurrent, 10);
247 assert_eq!(config.batch_size, 10);
248 }
249
250 #[test]
251 fn test_worker_config_custom() {
252 let config = WorkerConfig {
253 capabilities: vec!["media".to_string(), "general".to_string()],
254 max_concurrent: 4,
255 ..Default::default()
256 };
257 assert_eq!(config.capabilities.len(), 2);
258 assert_eq!(config.max_concurrent, 4);
259 }
260}