aurora_db/workers/
executor.rs1use super::job::Job;
2use super::queue::JobQueue;
3use crate::error::Result;
4use std::collections::HashMap;
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10use tokio::task::JoinHandle;
11use tokio::time::{interval, timeout};
12
13pub type JobHandler =
15 Arc<dyn Fn(Job) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
16
17#[derive(Clone)]
19pub struct WorkerConfig {
20 pub storage_path: String,
21 pub concurrency: usize,
22 pub poll_interval_ms: u64,
23 pub cleanup_interval_seconds: u64,
24}
25
26impl Default for WorkerConfig {
27 fn default() -> Self {
28 Self {
29 storage_path: "./aurora_workers".to_string(),
30 concurrency: 4,
31 poll_interval_ms: 100,
32 cleanup_interval_seconds: 3600, }
34 }
35}
36
37pub struct WorkerExecutor {
39 queue: Arc<JobQueue>,
40 handlers: Arc<RwLock<HashMap<String, JobHandler>>>,
41 config: WorkerConfig,
42 running: Arc<RwLock<bool>>,
43 worker_handles: Arc<RwLock<Vec<JoinHandle<()>>>>,
44}
45
46impl WorkerExecutor {
47 pub fn new(queue: Arc<JobQueue>, config: WorkerConfig) -> Self {
48 Self {
49 queue,
50 handlers: Arc::new(RwLock::new(HashMap::new())),
51 config,
52 running: Arc::new(RwLock::new(false)),
53 worker_handles: Arc::new(RwLock::new(Vec::new())),
54 }
55 }
56
57 pub async fn register_handler<F, Fut>(&self, job_type: impl Into<String>, handler: F)
59 where
60 F: Fn(Job) -> Fut + Send + Sync + 'static,
61 Fut: Future<Output = Result<()>> + Send + 'static,
62 {
63 let handler = Arc::new(
64 move |job: Job| -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
65 Box::pin(handler(job))
66 },
67 );
68
69 self.handlers.write().await.insert(job_type.into(), handler);
70 }
71
72 pub async fn start(&self) -> Result<()> {
74 let mut running = self.running.write().await;
75 if *running {
76 return Ok(());
77 }
78 *running = true;
79 drop(running);
80
81 let mut handles = self.worker_handles.write().await;
83 for worker_id in 0..self.config.concurrency {
84 let handle = self.spawn_worker(worker_id);
85 handles.push(handle);
86 }
87
88 let cleanup_handle = self.spawn_cleanup_task();
90 handles.push(cleanup_handle);
91
92 Ok(())
93 }
94
95 pub async fn stop(&self) -> Result<()> {
97 let mut running = self.running.write().await;
98 *running = false;
99 drop(running);
100
101 let mut handles = self.worker_handles.write().await;
103 for handle in handles.drain(..) {
104 handle.abort();
105 }
106
107 Ok(())
108 }
109
110 fn spawn_worker(&self, worker_id: usize) -> JoinHandle<()> {
112 let queue = Arc::clone(&self.queue);
113 let handlers = Arc::clone(&self.handlers);
114 let running = Arc::clone(&self.running);
115 let poll_interval = self.config.poll_interval_ms;
116
117 tokio::spawn(async move {
118 let mut interval = interval(Duration::from_millis(poll_interval));
119
120 loop {
121 interval.tick().await;
122
123 if !*running.read().await {
124 break;
125 }
126
127 match queue.dequeue().await {
129 Ok(Some(mut job)) => {
130 println!(
131 "[Worker {}] Processing job: {} ({})",
132 worker_id, job.id, job.job_type
133 );
134
135 let handlers = handlers.read().await;
137 let handler = handlers.get(&job.job_type);
138
139 if let Some(handler) = handler {
140 let handler = Arc::clone(handler);
141 drop(handlers);
142
143 let result = if let Some(timeout_secs) = job.timeout_seconds {
145 timeout(Duration::from_secs(timeout_secs), handler(job.clone()))
146 .await
147 } else {
148 Ok(handler(job.clone()).await)
149 };
150
151 match result {
152 Ok(Ok(())) => {
153 job.mark_completed();
154 println!("[Worker {}] Job completed: {}", worker_id, job.id);
155 }
156 Ok(Err(e)) => {
157 job.mark_failed(e.to_string());
158 println!(
159 "[Worker {}] Job failed: {} - {}",
160 worker_id, job.id, e
161 );
162 }
163 Err(_) => {
164 job.mark_failed("Timeout".to_string());
165 println!("[Worker {}] Job timeout: {}", worker_id, job.id);
166 }
167 }
168
169 let job_id = job.id.clone();
171 let _ = queue.update_job(&job_id, job).await;
172 } else {
173 let job_type = job.job_type.clone();
174 job.mark_failed("No handler registered".to_string());
175 let job_id = job.id.clone();
176 let _ = queue.update_job(&job_id, job).await;
177 println!(
178 "[Worker {}] No handler for job type: {}",
179 worker_id, job_type
180 );
181 }
182 }
183 Ok(None) => {
184 }
186 Err(e) => {
187 eprintln!("[Worker {}] Error dequeuing job: {}", worker_id, e);
188 }
189 }
190 }
191
192 println!("[Worker {}] Stopped", worker_id);
193 })
194 }
195
196 fn spawn_cleanup_task(&self) -> JoinHandle<()> {
198 let queue = Arc::clone(&self.queue);
199 let running = Arc::clone(&self.running);
200 let cleanup_interval = self.config.cleanup_interval_seconds;
201
202 tokio::spawn(async move {
203 let mut interval = interval(Duration::from_secs(cleanup_interval));
204
205 loop {
206 interval.tick().await;
207
208 if !*running.read().await {
209 break;
210 }
211
212 match queue.cleanup_completed().await {
213 Ok(count) => {
214 if count > 0 {
215 println!("[Cleanup] Removed {} completed jobs", count);
216 }
217 }
218 Err(e) => {
219 eprintln!("[Cleanup] Error: {}", e);
220 }
221 }
222 }
223
224 println!("[Cleanup] Stopped");
225 })
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::workers::job::{Job, JobStatus};
233 use tempfile::TempDir;
234 use tokio::time::sleep;
235
236 #[tokio::test]
237 async fn test_worker_execution() {
238 let temp_dir = TempDir::new().unwrap();
239 let config = WorkerConfig {
240 storage_path: temp_dir.path().to_str().unwrap().to_string(),
241 concurrency: 2,
242 poll_interval_ms: 50,
243 cleanup_interval_seconds: 10, };
245
246 let queue = Arc::new(JobQueue::new(config.storage_path.clone()).unwrap());
247 let executor = WorkerExecutor::new(Arc::clone(&queue), config);
248
249 executor
251 .register_handler("test", |_job| async { Ok(()) })
252 .await;
253
254 executor.start().await.unwrap();
256
257 let job = Job::new("test");
259 let job_id = queue.enqueue(job).await.unwrap();
260
261 sleep(Duration::from_millis(300)).await;
263
264 let status = queue.get_status(&job_id).await.unwrap();
266 assert!(matches!(status, Some(JobStatus::Completed) | None));
268
269 executor.stop().await.unwrap();
270 }
271}