Skip to main content

oxidite_queue/
worker.rs

1use std::sync::Arc;
2use std::time::Duration;
3use tokio::time::sleep;
4use crate::queue::Queue;
5
6
7/// Worker for processing jobs
8pub struct Worker {
9    queue: Arc<Queue>,
10    worker_count: usize,
11    poll_interval: Duration,
12}
13
14impl Worker {
15    pub fn new(queue: Arc<Queue>) -> Self {
16        Self {
17            queue,
18            worker_count: 4,
19            poll_interval: Duration::from_secs(1),
20        }
21    }
22
23    pub fn worker_count(mut self, count: usize) -> Self {
24        self.worker_count = count;
25        self
26    }
27
28    pub fn poll_interval(mut self, interval: Duration) -> Self {
29        self.poll_interval = interval;
30        self
31    }
32
33    pub async fn start(self) {
34        println!("Starting {} workers...", self.worker_count);
35        
36        let mut handles = vec![];
37        
38        for i in 0..self.worker_count {
39            let queue = self.queue.clone();
40            let poll_interval = self.poll_interval;
41            
42            let handle = tokio::spawn(async move {
43                loop {
44                    match queue.dequeue().await {
45                        Ok(Some(job)) => {
46                            println!("Worker {}: Processing job {}", i, job.id);
47                            
48                            // In a real implementation, deserialize and execute the job
49                            // For now, just mark as complete
50                            sleep(Duration::from_millis(100)).await;
51                            
52                            if let Err(e) = queue.complete(&job.id).await {
53                                eprintln!("Worker {}: Failed to mark job as complete: {}", i, e);
54                            }
55                        }
56                        Ok(None) => {
57                            // No jobs available, sleep
58                            sleep(poll_interval).await;
59                        }
60                        Err(e) => {
61                            eprintln!("Worker {}: Error dequeuing job: {}", i, e);
62                            sleep(poll_interval).await;
63                        }
64                    }
65                }
66            });
67            
68            handles.push(handle);
69        }
70
71        // Wait for all workers (they run forever)
72        for handle in handles {
73            let _ = handle.await;
74        }
75    }
76}