1use std::sync::Arc;
2use std::time::Duration;
3use tokio::time::sleep;
4use crate::queue::Queue;
5
6
7pub struct Worker {
9 queue: Arc<Queue>,
10 worker_count: usize,
11 poll_interval: Duration,
12}
13
14impl Worker {
15 pub fn new(queue: Arc<Queue>) -> Self {
16 Self {
17 queue,
18 worker_count: 4,
19 poll_interval: Duration::from_secs(1),
20 }
21 }
22
23 pub fn worker_count(mut self, count: usize) -> Self {
24 self.worker_count = count;
25 self
26 }
27
28 pub fn poll_interval(mut self, interval: Duration) -> Self {
29 self.poll_interval = interval;
30 self
31 }
32
33 pub async fn start(self) {
34 println!("Starting {} workers...", self.worker_count);
35
36 let mut handles = vec![];
37
38 for i in 0..self.worker_count {
39 let queue = self.queue.clone();
40 let poll_interval = self.poll_interval;
41
42 let handle = tokio::spawn(async move {
43 loop {
44 match queue.dequeue().await {
45 Ok(Some(job)) => {
46 println!("Worker {}: Processing job {}", i, job.id);
47
48 sleep(Duration::from_millis(100)).await;
51
52 if let Err(e) = queue.complete(&job.id).await {
53 eprintln!("Worker {}: Failed to mark job as complete: {}", i, e);
54 }
55 }
56 Ok(None) => {
57 sleep(poll_interval).await;
59 }
60 Err(e) => {
61 eprintln!("Worker {}: Error dequeuing job: {}", i, e);
62 sleep(poll_interval).await;
63 }
64 }
65 }
66 });
67
68 handles.push(handle);
69 }
70
71 for handle in handles {
73 let _ = handle.await;
74 }
75 }
76}