aurora_db/workers/
executor.rs

1use super::job::Job;
2use super::queue::JobQueue;
3use crate::error::Result;
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use tokio::task::JoinHandle;
11use tokio::time::{interval, timeout};
12
13/// Job handler function type
14pub type JobHandler =
15    Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
16
17/// Worker configuration
18#[derive(Clone)]
19pub struct WorkerConfig {
20    pub storage_path: String,
21    pub concurrency: usize,
22    pub poll_interval_ms: u64,
23    pub cleanup_interval_seconds: u64,
24}
25
26impl Default for WorkerConfig {
27    fn default() -> Self {
28        Self {
29            storage_path: "./aurora_workers".to_string(),
30            concurrency: 4,
31            poll_interval_ms: 100,
32            cleanup_interval_seconds: 3600, // 1 hour
33        }
34    }
35}
36
37/// Worker executor that processes jobs
38pub struct WorkerExecutor {
39    queue: Arc<JobQueue>,
40    handlers: Arc<RwLock<HashMap<String, JobHandler>>>,
41    config: WorkerConfig,
42    running: Arc<RwLock<bool>>,
43    worker_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
44}
45
46impl WorkerExecutor {
47    pub fn new(queue: Arc<JobQueue>, config: WorkerConfig) -> Self {
48        Self {
49            queue,
50            handlers: Arc::new(RwLock::new(HashMap::new())),
51            config,
52            running: Arc::new(RwLock::new(false)),
53            worker_handles: Arc::new(RwLock::new(Vec::new())),
54        }
55    }
56
57    /// Register a job handler
58    pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
59    where
60        F: Fn(Job) -> Fut + Send + Sync + 'static,
61        Fut: Future<Output = Result<()>> + Send + 'static,
62    {
63        let handler = Arc::new(
64            move |job: Job| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
65                Box::pin(handler(job))
66            },
67        );
68
69        self.handlers.write().await.insert(job_type.into(), handler);
70    }
71
72    /// Start the worker executor
73    pub async fn start(&self) -> Result<()> {
74        let mut running = self.running.write().await;
75        if *running {
76            return Ok(());
77        }
78        *running = true;
79        drop(running);
80
81        // Spawn worker tasks
82        let mut handles = self.worker_handles.write().await;
83        for worker_id in 0..self.config.concurrency {
84            let handle = self.spawn_worker(worker_id);
85            handles.push(handle);
86        }
87
88        // Spawn cleanup task
89        let cleanup_handle = self.spawn_cleanup_task();
90        handles.push(cleanup_handle);
91
92        Ok(())
93    }
94
95    /// Stop the worker executor
96    pub async fn stop(&self) -> Result<()> {
97        let mut running = self.running.write().await;
98        *running = false;
99        drop(running);
100
101        // Cancel all worker tasks
102        let mut handles = self.worker_handles.write().await;
103        for handle in handles.drain(..) {
104            handle.abort();
105        }
106
107        Ok(())
108    }
109
110    /// Spawn a worker task
111    fn spawn_worker(&self, worker_id: usize) -> JoinHandle<()> {
112        let queue = Arc::clone(&self.queue);
113        let handlers = Arc::clone(&self.handlers);
114        let running = Arc::clone(&self.running);
115        let poll_interval = self.config.poll_interval_ms;
116
117        tokio::spawn(async move {
118            let mut interval = interval(Duration::from_millis(poll_interval));
119
120            loop {
121                interval.tick().await;
122
123                if !*running.read().await {
124                    break;
125                }
126
127                // Try to dequeue a job
128                match queue.dequeue().await {
129                    Ok(Some(mut job)) => {
130                        println!(
131                            "[Worker {}] Processing job: {} ({})",
132                            worker_id, job.id, job.job_type
133                        );
134
135                        // Get handler
136                        let handlers = handlers.read().await;
137                        let handler = handlers.get(&job.job_type);
138
139                        if let Some(handler) = handler {
140                            let handler = Arc::clone(handler);
141                            drop(handlers);
142
143                            // Execute job with timeout
144                            let result = if let Some(timeout_secs) = job.timeout_seconds {
145                                timeout(Duration::from_secs(timeout_secs), handler(job.clone()))
146                                    .await
147                            } else {
148                                Ok(handler(job.clone()).await)
149                            };
150
151                            match result {
152                                Ok(Ok(())) => {
153                                    job.mark_completed();
154                                    println!("[Worker {}] Job completed: {}", worker_id, job.id);
155                                }
156                                Ok(Err(e)) => {
157                                    job.mark_failed(e.to_string());
158                                    println!(
159                                        "[Worker {}] Job failed: {} - {}",
160                                        worker_id, job.id, e
161                                    );
162                                }
163                                Err(_) => {
164                                    job.mark_failed("Timeout".to_string());
165                                    println!("[Worker {}] Job timeout: {}", worker_id, job.id);
166                                }
167                            }
168
169                            // Update job status
170                            let job_id = job.id.clone();
171                            let _ = queue.update_job(&job_id, job).await;
172                        } else {
173                            let job_type = job.job_type.clone();
174                            job.mark_failed("No handler registered".to_string());
175                            let job_id = job.id.clone();
176                            let _ = queue.update_job(&job_id, job).await;
177                            println!(
178                                "[Worker {}] No handler for job type: {}",
179                                worker_id, job_type
180                            );
181                        }
182                    }
183                    Ok(None) => {
184                        // No jobs available
185                    }
186                    Err(e) => {
187                        eprintln!("[Worker {}] Error dequeuing job: {}", worker_id, e);
188                    }
189                }
190            }
191
192            println!("[Worker {}] Stopped", worker_id);
193        })
194    }
195
196    /// Spawn cleanup task
197    fn spawn_cleanup_task(&self) -> JoinHandle<()> {
198        let queue = Arc::clone(&self.queue);
199        let running = Arc::clone(&self.running);
200        let cleanup_interval = self.config.cleanup_interval_seconds;
201
202        tokio::spawn(async move {
203            let mut interval = interval(Duration::from_secs(cleanup_interval));
204
205            loop {
206                interval.tick().await;
207
208                if !*running.read().await {
209                    break;
210                }
211
212                match queue.cleanup_completed().await {
213                    Ok(count) => {
214                        if count > 0 {
215                            println!("[Cleanup] Removed {} completed jobs", count);
216                        }
217                    }
218                    Err(e) => {
219                        eprintln!("[Cleanup] Error: {}", e);
220                    }
221                }
222            }
223
224            println!("[Cleanup] Stopped");
225        })
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::workers::job::{Job, JobStatus};
233    use tempfile::TempDir;
234    use tokio::time::sleep;
235
236    #[tokio::test]
237    async fn test_worker_execution() {
238        let temp_dir = TempDir::new().unwrap();
239        let config = WorkerConfig {
240            storage_path: temp_dir.path().to_str().unwrap().to_string(),
241            concurrency: 2,
242            poll_interval_ms: 50,
243            cleanup_interval_seconds: 10, // Short interval for testing
244        };
245
246        let queue = Arc::new(JobQueue::new(config.storage_path.clone()).unwrap());
247        let executor = WorkerExecutor::new(Arc::clone(&queue), config);
248
249        // Register a test handler
250        executor
251            .register_handler("test", |_job| async { Ok(()) })
252            .await;
253
254        // Start executor
255        executor.start().await.unwrap();
256
257        // Enqueue a job
258        let job = Job::new("test");
259        let job_id = queue.enqueue(job).await.unwrap();
260
261        // Wait for job to complete
262        sleep(Duration::from_millis(300)).await;
263
264        // Check job status - it might be completed or already cleaned up
265        let status = queue.get_status(&job_id).await.unwrap();
266        // Either completed or None (cleaned up) is ok
267        assert!(matches!(status, Some(JobStatus::Completed) | None));
268
269        executor.stop().await.unwrap();
270    }
271}