forge_runtime/jobs/
worker.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::sync::mpsc;
5use tracing::Instrument;
6use uuid::Uuid;
7
8use super::executor::JobExecutor;
9use super::queue::JobQueue;
10use super::registry::JobRegistry;
11
12#[derive(Debug, Clone)]
14pub struct WorkerConfig {
15 pub id: Option<Uuid>,
17 pub capabilities: Vec<String>,
19 pub max_concurrent: usize,
21 pub poll_interval: Duration,
23 pub batch_size: i32,
25 pub stale_cleanup_interval: Duration,
27 pub stale_threshold: chrono::Duration,
29}
30
31impl Default for WorkerConfig {
32 fn default() -> Self {
33 Self {
34 id: None,
35 capabilities: vec!["general".to_string()],
36 max_concurrent: 10,
37 poll_interval: Duration::from_millis(100),
38 batch_size: 10,
39 stale_cleanup_interval: Duration::from_secs(60),
40 stale_threshold: chrono::Duration::minutes(5),
41 }
42 }
43}
44
45pub struct Worker {
47 id: Uuid,
48 config: WorkerConfig,
49 queue: JobQueue,
50 executor: Arc<JobExecutor>,
51 shutdown_tx: Option<mpsc::Sender<()>>,
52}
53
54impl Worker {
55 pub fn new(
57 config: WorkerConfig,
58 queue: JobQueue,
59 registry: JobRegistry,
60 db_pool: sqlx::PgPool,
61 ) -> Self {
62 let id = config.id.unwrap_or_else(Uuid::new_v4);
63 let executor = Arc::new(JobExecutor::new(queue.clone(), registry, db_pool));
64
65 Self {
66 id,
67 config,
68 queue,
69 executor,
70 shutdown_tx: None,
71 }
72 }
73
74 pub fn id(&self) -> Uuid {
76 self.id
77 }
78
79 pub fn capabilities(&self) -> &[String] {
81 &self.config.capabilities
82 }
83
84 pub async fn run(&mut self) -> Result<(), WorkerError> {
86 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
87 self.shutdown_tx = Some(shutdown_tx);
88
89 let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
91
92 let cleanup_queue = self.queue.clone();
94 let cleanup_interval = self.config.stale_cleanup_interval;
95 let stale_threshold = self.config.stale_threshold;
96 tokio::spawn(async move {
97 loop {
98 tokio::time::sleep(cleanup_interval).await;
99
100 if let Err(e) = cleanup_queue.release_stale(stale_threshold).await {
102 tracing::warn!(error = %e, "Failed to cleanup stale jobs");
103 }
104
105 match cleanup_queue.cleanup_expired().await {
107 Ok(count) if count > 0 => {
108 tracing::debug!(count, "Cleaned up expired job records");
109 }
110 Err(e) => {
111 tracing::warn!(error = %e, "Failed to cleanup expired jobs");
112 }
113 _ => {}
114 }
115 }
116 });
117
118 tracing::debug!(
119 worker_id = %self.id,
120 capabilities = ?self.config.capabilities,
121 "Worker started"
122 );
123
124 loop {
125 tokio::select! {
126 _ = shutdown_rx.recv() => {
127 tracing::debug!(worker_id = %self.id, "Worker shutting down");
128 break;
129 }
130 _ = tokio::time::sleep(self.config.poll_interval) => {
131 let available = semaphore.available_permits();
133 if available == 0 {
134 continue;
135 }
136
137 let batch_size = (available as i32).min(self.config.batch_size);
138
139 let jobs = match self.queue.claim(
141 self.id,
142 &self.config.capabilities,
143 batch_size,
144 ).await {
145 Ok(jobs) => jobs,
146 Err(e) => {
147 tracing::warn!(error = %e, "Failed to claim jobs");
148 continue;
149 }
150 };
151
152 for job in jobs {
154 let permit = semaphore.clone().acquire_owned().await.expect("semaphore closed");
155 let executor = self.executor.clone();
156 let job_id = job.id;
157 let job_type = job.job_type.clone();
158
159 tokio::spawn(async move {
160 let start = std::time::Instant::now();
161 let span = tracing::info_span!(
162 "job.execute",
163 job_id = %job_id,
164 job_type = %job_type,
165 );
166
167 let result = executor.execute(&job).instrument(span).await;
168
169 let duration_secs = start.elapsed().as_secs_f64();
170
171 match &result {
172 super::executor::ExecutionResult::Completed { .. } => {
173 tracing::info!(job_id = %job_id, job_type = %job_type, duration_ms = (duration_secs * 1000.0) as u64, "Job completed");
174 crate::observability::record_job_execution(&job_type, "completed", duration_secs);
175 }
176 super::executor::ExecutionResult::Failed { error, retryable } => {
177 if *retryable {
178 tracing::warn!(job_id = %job_id, job_type = %job_type, error = %error, "Job failed, will retry");
179 crate::observability::record_job_execution(&job_type, "retrying", duration_secs);
180 } else {
181 tracing::error!(job_id = %job_id, job_type = %job_type, error = %error, "Job failed permanently");
182 crate::observability::record_job_execution(&job_type, "failed", duration_secs);
183 }
184 }
185 super::executor::ExecutionResult::TimedOut { retryable } => {
186 tracing::error!(job_id = %job_id, job_type = %job_type, will_retry = %retryable, "Job timed out");
187 crate::observability::record_job_execution(&job_type, "timeout", duration_secs);
188 }
189 super::executor::ExecutionResult::Cancelled { reason } => {
190 tracing::info!(job_id = %job_id, job_type = %job_type, reason = %reason, "Job cancelled");
191 crate::observability::record_job_execution(&job_type, "cancelled", duration_secs);
192 }
193 }
194
195 drop(permit);
196 });
197 }
198 }
199 }
200 }
201
202 Ok(())
203 }
204
205 pub async fn shutdown(&self) {
207 if let Some(ref tx) = self.shutdown_tx {
208 let _ = tx.send(()).await;
209 }
210 }
211}
212
213#[derive(Debug, thiserror::Error)]
215pub enum WorkerError {
216 #[error("Database error: {0}")]
217 Database(String),
218
219 #[error("Job execution error: {0}")]
220 Execution(String),
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn test_worker_config_default() {
229 let config = WorkerConfig::default();
230 assert_eq!(config.capabilities, vec!["general".to_string()]);
231 assert_eq!(config.max_concurrent, 10);
232 assert_eq!(config.batch_size, 10);
233 }
234
235 #[test]
236 fn test_worker_config_custom() {
237 let config = WorkerConfig {
238 capabilities: vec!["media".to_string(), "general".to_string()],
239 max_concurrent: 4,
240 ..Default::default()
241 };
242 assert_eq!(config.capabilities.len(), 2);
243 assert_eq!(config.max_concurrent, 4);
244 }
245}