1use crate::error::{QueueError, QueueResult};
4use crate::job::{Job, JobId};
5use crate::queue::Queue;
6use armature_log::{debug, info, warn};
7use std::collections::HashMap;
8use std::future::Future;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::RwLock;
13use tokio::task::JoinHandle;
14
15pub type JobHandler =
17 Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = QueueResult<()>> + Send>> + Send + Sync>;
18
19#[derive(Debug, Clone)]
21pub struct WorkerConfig {
22 pub concurrency: usize,
24
25 pub poll_interval: Duration,
27
28 pub job_timeout: Duration,
30
31 pub log_execution: bool,
33}
34
35impl Default for WorkerConfig {
36 fn default() -> Self {
37 Self {
38 concurrency: 10,
39 poll_interval: Duration::from_secs(1),
40 job_timeout: Duration::from_secs(300), log_execution: true,
42 }
43 }
44}
45
46pub struct Worker {
48 queue: Queue,
49 handlers: Arc<RwLock<HashMap<String, JobHandler>>>,
50 config: WorkerConfig,
51 running: Arc<RwLock<bool>>,
52 handles: Vec<JoinHandle<()>>,
53}
54
55impl Worker {
56 pub fn new(queue: Queue) -> Self {
58 Self::with_config(queue, WorkerConfig::default())
59 }
60
61 pub fn with_config(queue: Queue, config: WorkerConfig) -> Self {
63 info!("Creating worker with concurrency: {}", config.concurrency);
64 debug!(
65 "Worker config - poll_interval: {:?}, job_timeout: {:?}",
66 config.poll_interval, config.job_timeout
67 );
68 Self {
69 queue,
70 handlers: Arc::new(RwLock::new(HashMap::new())),
71 config,
72 running: Arc::new(RwLock::new(false)),
73 handles: Vec::new(),
74 }
75 }
76
77 pub fn register_handler<F, Fut>(&mut self, job_type: impl Into<String>, handler: F)
99 where
100 F: Fn(Job) -> Fut + Send + Sync + 'static,
101 Fut: Future<Output = QueueResult<()>> + Send + 'static,
102 {
103 let wrapped_handler = Arc::new(
104 move |job: Job| -> Pin<Box<dyn Future<Output = QueueResult<()>> + Send>> {
105 Box::pin(handler(job))
106 },
107 );
108
109 let job_type = job_type.into();
110 let handlers = self.handlers.clone();
111
112 tokio::spawn(async move {
113 let mut handlers = handlers.write().await;
114 handlers.insert(job_type, wrapped_handler);
115 });
116 }
117
118 pub async fn start(&mut self) -> QueueResult<()> {
120 let mut running = self.running.write().await;
121 if *running {
122 warn!("Worker already running");
123 return Err(QueueError::WorkerAlreadyRunning);
124 }
125 *running = true;
126 drop(running);
127
128 info!(
129 "Starting worker with {} concurrent processors",
130 self.config.concurrency
131 );
132
133 for i in 0..self.config.concurrency {
135 let queue = self.queue.clone();
136 let handlers = self.handlers.clone();
137 let running = self.running.clone();
138 let poll_interval = self.config.poll_interval;
139 let job_timeout = self.config.job_timeout;
140 let log = self.config.log_execution;
141
142 let handle = tokio::spawn(async move {
143 while *running.read().await {
144 match queue.dequeue().await {
145 Ok(Some(job)) => {
146 let job_id = job.id;
147 let job_type = job.job_type.clone();
148
149 if log {
150 println!(
151 "[WORKER-{}] Processing job: {} (type: {})",
152 i, job_id, job_type
153 );
154 }
155
156 let handler = {
158 let handlers = handlers.read().await;
159 handlers.get(&job_type).cloned()
160 };
161
162 if let Some(handler) = handler {
163 let result =
165 tokio::time::timeout(job_timeout, handler(job.clone())).await;
166
167 match result {
168 Ok(Ok(())) => {
169 if let Err(e) = queue.complete(job_id).await {
171 eprintln!(
172 "[WORKER-{}] Failed to mark job as complete: {}",
173 i, e
174 );
175 } else if log {
176 println!(
177 "[WORKER-{}] Job {} completed successfully",
178 i, job_id
179 );
180 }
181 }
182 Ok(Err(e)) => {
183 eprintln!("[WORKER-{}] Job {} failed: {}", i, job_id, e);
185 if let Err(err) = queue.fail(job_id, e.to_string()).await {
186 eprintln!(
187 "[WORKER-{}] Failed to mark job as failed: {}",
188 i, err
189 );
190 }
191 }
192 Err(_) => {
193 eprintln!("[WORKER-{}] Job {} timed out", i, job_id);
195 if let Err(e) =
196 queue.fail(job_id, "Job timeout".to_string()).await
197 {
198 eprintln!(
199 "[WORKER-{}] Failed to mark job as failed: {}",
200 i, e
201 );
202 }
203 }
204 }
205 } else {
206 eprintln!("[WORKER-{}] No handler for job type: {}", i, job_type);
207 if let Err(e) = queue
208 .fail(job_id, format!("No handler for job type: {}", job_type))
209 .await
210 {
211 eprintln!("[WORKER-{}] Failed to mark job as failed: {}", i, e);
212 }
213 }
214 }
215 Ok(None) => {
216 tokio::time::sleep(poll_interval).await;
218 }
219 Err(e) => {
220 eprintln!("[WORKER-{}] Error dequeuing job: {}", i, e);
221 tokio::time::sleep(poll_interval).await;
222 }
223 }
224 }
225
226 if log {
227 println!("[WORKER-{}] Stopped", i);
228 }
229 });
230
231 self.handles.push(handle);
232 }
233
234 Ok(())
235 }
236
237 pub async fn process_batch(
261 &self,
262 job_type: &str,
263 max_batch_size: usize,
264 ) -> QueueResult<Vec<JobId>> {
265 use tokio::task::JoinSet;
266
267 let mut jobs = Vec::new();
269 for _ in 0..max_batch_size {
270 match self.queue.dequeue().await? {
271 Some(job) => {
272 if job.job_type == job_type {
273 jobs.push(job);
274 } else {
275 break;
278 }
279 }
280 None => break,
281 }
282 }
283
284 if jobs.is_empty() {
285 return Ok(Vec::new());
286 }
287
288 if self.config.log_execution {
289 println!(
290 "[BATCH] Processing {} jobs of type '{}'",
291 jobs.len(),
292 job_type
293 );
294 }
295
296 let handler = {
298 let handlers = self.handlers.read().await;
299 handlers.get(job_type).cloned()
300 };
301
302 let Some(handler) = handler else {
303 return Err(QueueError::NoHandler(job_type.to_string()));
304 };
305
306 let mut set = JoinSet::new();
308 for job in jobs {
309 let handler = handler.clone();
310 let queue = self.queue.clone();
311 let job_id = job.id;
312 let log = self.config.log_execution;
313 let timeout = self.config.job_timeout;
314
315 set.spawn(async move {
316 let result = tokio::time::timeout(timeout, handler(job.clone())).await;
317
318 match result {
319 Ok(Ok(())) => {
320 if let Err(e) = queue.complete(job_id).await {
322 eprintln!("[BATCH] Failed to mark job {} as complete: {}", job_id, e);
323 } else if log {
324 println!("[BATCH] Job {} completed successfully", job_id);
325 }
326 Ok(job_id)
327 }
328 Ok(Err(e)) => {
329 eprintln!("[BATCH] Job {} failed: {}", job_id, e);
331 if let Err(err) = queue.fail(job_id, e.to_string()).await {
332 eprintln!("[BATCH] Failed to mark job {} as failed: {}", job_id, err);
333 }
334 Err(e)
335 }
336 Err(_) => {
337 eprintln!("[BATCH] Job {} timed out", job_id);
339 if let Err(e) = queue
340 .fail(job_id, "Job execution timed out".to_string())
341 .await
342 {
343 eprintln!("[BATCH] Failed to mark job {} as failed: {}", job_id, e);
344 }
345 Err(QueueError::ExecutionFailed("Timeout".to_string()))
346 }
347 }
348 });
349 }
350
351 let mut processed = Vec::new();
353 while let Some(result) = set.join_next().await {
354 match result {
355 Ok(Ok(job_id)) => processed.push(job_id),
356 Ok(Err(_)) => {} Err(e) => eprintln!("[BATCH] Task join error: {}", e),
358 }
359 }
360
361 if self.config.log_execution {
362 println!(
363 "[BATCH] Batch complete: {}/{} jobs succeeded",
364 processed.len(),
365 processed.len()
366 );
367 }
368
369 Ok(processed)
370 }
371
372 pub fn register_cpu_intensive_handler<F>(&mut self, job_type: impl Into<String>, handler: F)
391 where
392 F: Fn(Job) -> QueueResult<()> + Send + Sync + 'static,
393 {
394 let handler = Arc::new(handler);
395
396 let wrapped = Arc::new(move |job: Job| {
397 let handler = handler.clone();
398 Box::pin(async move {
399 tokio::task::spawn_blocking(move || handler(job))
401 .await
402 .map_err(|e| QueueError::ExecutionFailed(e.to_string()))?
403 }) as Pin<Box<dyn Future<Output = QueueResult<()>> + Send>>
404 });
405
406 let mut handlers = tokio::runtime::Handle::current().block_on(self.handlers.write());
407 handlers.insert(job_type.into(), wrapped);
408 }
409
410 pub async fn stop(&mut self) -> QueueResult<()> {
411 let mut running = self.running.write().await;
412 if !*running {
413 return Err(QueueError::WorkerNotRunning);
414 }
415 *running = false;
416 drop(running);
417
418 if self.config.log_execution {
419 println!("[WORKER] Stopping...");
420 }
421
422 for handle in self.handles.drain(..) {
424 handle.abort();
425 }
426
427 if self.config.log_execution {
428 println!("[WORKER] Stopped");
429 }
430
431 Ok(())
432 }
433
434 pub async fn is_running(&self) -> bool {
436 *self.running.read().await
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_worker_config() {
446 let config = WorkerConfig::default();
447 assert_eq!(config.concurrency, 10);
448 assert!(config.log_execution);
449 }
450
451 #[tokio::test]
452 async fn test_worker_creation() {
453 let config = WorkerConfig {
456 concurrency: 5,
457 poll_interval: Duration::from_millis(500),
458 job_timeout: Duration::from_secs(60),
459 log_execution: false,
460 };
461
462 assert_eq!(config.concurrency, 5);
463 }
464}