1use chrono::Utc;
7use std::sync::Arc;
8use tracing::{debug, error, info, warn};
9
10use super::{
11 WorkerConfig, WorkerRegistry, WorkerResult, retry::RetryPolicy, worker::WorkerContext,
12};
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::Storage;
16
17pub struct JobProcessor {
19 worker_registry: Arc<WorkerRegistry>,
20 storage: Arc<dyn Storage>,
21 retry_policy: RetryPolicy,
22 worker_config: WorkerConfig,
23}
24
25impl JobProcessor {
26 pub fn new(
28 worker_registry: Arc<WorkerRegistry>,
29 storage: Arc<dyn Storage>,
30 worker_config: WorkerConfig,
31 ) -> Self {
32 Self {
33 worker_registry,
34 storage,
35 retry_policy: RetryPolicy::default(),
36 worker_config,
37 }
38 }
39
40 pub fn with_retry_policy(
42 worker_registry: Arc<WorkerRegistry>,
43 storage: Arc<dyn Storage>,
44 worker_config: WorkerConfig,
45 retry_policy: RetryPolicy,
46 ) -> Self {
47 Self {
48 worker_registry,
49 storage,
50 retry_policy,
51 worker_config,
52 }
53 }
54
55 pub async fn process_job(&self, mut job: Job) -> Result<()> {
57 let job_id = job.id.clone();
58 let method = job.method.clone();
59
60 info!("Starting job processing: {} ({})", job_id, method);
61
62 let worker = match self.worker_registry.get_worker(&method) {
64 Some(worker) => worker,
65 None => {
66 error!("No worker found for method: {}", method);
67 return self
68 .fail_job_permanently(
69 &mut job,
70 format!("No worker registered for method: {}", method),
71 None,
72 0,
73 )
74 .await;
75 }
76 };
77
78 let processing_state = JobState::processing(
80 &self.worker_config.worker_id,
81 &self.worker_config.server_name,
82 );
83
84 if let Err(e) = job.set_state(processing_state) {
85 error!("Failed to set job state to Processing: {}", e);
86 return Err(e);
87 }
88
89 if let Err(e) = self.storage.update(&job).await {
91 error!("Failed to update job state in storage: {}", e);
92 return Err(QmlError::StorageError {
93 message: e.to_string(),
94 });
95 }
96
97 let retry_count = self.extract_retry_count(&job);
99
100 let context = if retry_count > 0 {
102 let previous_exception = self.extract_previous_exception(&job);
103 WorkerContext::retry_from(
104 self.worker_config.clone(),
105 retry_count + 1,
106 previous_exception,
107 )
108 } else {
109 WorkerContext::new(self.worker_config.clone())
110 };
111
112 let start_time = Utc::now();
114 let execution_result = match tokio::time::timeout(
115 self.worker_config.job_timeout.to_std().unwrap(),
116 worker.execute(&job, &context),
117 )
118 .await
119 {
120 Ok(result) => result,
121 Err(_) => {
122 warn!(
123 "Job {} timed out after {:?}",
124 job_id, self.worker_config.job_timeout
125 );
126 return self.handle_job_timeout(&mut job, retry_count).await;
127 }
128 };
129
130 let duration = (Utc::now() - start_time).num_milliseconds() as u64;
131
132 match execution_result {
134 Ok(WorkerResult::Success {
135 result, metadata, ..
136 }) => {
137 info!("Job {} completed successfully in {}ms", job_id, duration);
138 self.complete_job_successfully(&mut job, result, duration, metadata)
139 .await
140 }
141 Ok(WorkerResult::Retry {
142 error, retry_at, ..
143 }) => {
144 warn!("Job {} failed and will be retried: {}", job_id, error);
145 self.handle_job_retry(&mut job, error, retry_at, retry_count)
146 .await
147 }
148 Ok(WorkerResult::Failure {
149 error, context: _, ..
150 }) => {
151 error!("Job {} failed permanently: {}", job_id, error);
152 self.fail_job_permanently(&mut job, error, None, retry_count)
153 .await
154 }
155 Err(e) => {
156 error!("Job {} execution error: {}", job_id, e);
157 self.handle_execution_error(&mut job, e, retry_count).await
158 }
159 }
160 }
161
162 async fn complete_job_successfully(
164 &self,
165 job: &mut Job,
166 result: Option<String>,
167 duration_ms: u64,
168 metadata: std::collections::HashMap<String, String>,
169 ) -> Result<()> {
170 let succeeded_state = JobState::succeeded(duration_ms, result);
171
172 if let Err(e) = job.set_state(succeeded_state) {
173 error!("Failed to set job state to Succeeded: {}", e);
174 return Err(e);
175 }
176
177 for (key, value) in metadata {
179 job.add_metadata(&format!("exec_{}", key), value);
180 }
181
182 self.storage
184 .update(job)
185 .await
186 .map_err(|e| QmlError::StorageError {
187 message: e.to_string(),
188 })?;
189
190 Ok(())
191 }
192
193 async fn handle_job_retry(
195 &self,
196 job: &mut Job,
197 error: String,
198 retry_at: Option<chrono::DateTime<Utc>>,
199 current_retry_count: u32,
200 ) -> Result<()> {
201 let next_attempt = current_retry_count + 1;
202
203 if !self.retry_policy.should_retry(None, next_attempt) {
205 debug!(
206 "Retry limit exceeded for job {}, failing permanently",
207 job.id
208 );
209 return self
210 .fail_job_permanently(job, error, None, current_retry_count)
211 .await;
212 }
213
214 let failed_state = JobState::failed(error.clone(), None, current_retry_count);
216 if let Err(e) = job.set_state(failed_state) {
217 error!("Failed to set job state to Failed: {}", e);
218 return Err(e);
219 }
220
221 let retry_time = retry_at
223 .or_else(|| self.retry_policy.calculate_retry_time(next_attempt))
224 .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
225
226 let retry_state = JobState::awaiting_retry(retry_time, next_attempt, &error);
228
229 if let Err(e) = job.set_state(retry_state) {
230 error!("Failed to set job state to AwaitingRetry: {}", e);
231 return Err(e);
232 }
233
234 self.storage
236 .update(job)
237 .await
238 .map_err(|e| QmlError::StorageError {
239 message: e.to_string(),
240 })?;
241
242 info!(
243 "Job {} scheduled for retry #{} at {}",
244 job.id, next_attempt, retry_time
245 );
246 Ok(())
247 }
248
249 async fn fail_job_permanently(
251 &self,
252 job: &mut Job,
253 error: String,
254 stack_trace: Option<String>,
255 retry_count: u32,
256 ) -> Result<()> {
257 let failed_state = JobState::failed(error, stack_trace, retry_count);
258
259 if let Err(e) = job.set_state(failed_state) {
260 error!("Failed to set job state to Failed: {}", e);
261 return Err(e);
262 }
263
264 self.storage
266 .update(job)
267 .await
268 .map_err(|e| QmlError::StorageError {
269 message: e.to_string(),
270 })?;
271
272 error!(
273 "Job {} failed permanently after {} attempts",
274 job.id,
275 retry_count + 1
276 );
277 Ok(())
278 }
279
280 async fn handle_job_timeout(&self, job: &mut Job, retry_count: u32) -> Result<()> {
282 let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
283
284 let next_attempt = retry_count + 1;
285 if self
286 .retry_policy
287 .should_retry(Some("TimeoutError"), next_attempt)
288 {
289 self.handle_job_retry(job, timeout_error, None, retry_count)
290 .await
291 } else {
292 self.fail_job_permanently(job, timeout_error, None, retry_count)
293 .await
294 }
295 }
296
297 async fn handle_execution_error(
299 &self,
300 job: &mut Job,
301 error: QmlError,
302 retry_count: u32,
303 ) -> Result<()> {
304 let error_type = match &error {
305 QmlError::StorageError { .. } => "StorageError",
306 QmlError::WorkerError { .. } => "WorkerError",
307 QmlError::TimeoutError { .. } => "TimeoutError",
308 _ => "UnknownError",
309 };
310
311 let error_message = error.to_string();
312 let next_attempt = retry_count + 1;
313
314 if self
315 .retry_policy
316 .should_retry(Some(error_type), next_attempt)
317 {
318 self.handle_job_retry(job, error_message, None, retry_count)
319 .await
320 } else {
321 self.fail_job_permanently(job, error_message, None, retry_count)
322 .await
323 }
324 }
325
326 fn extract_retry_count(&self, job: &Job) -> u32 {
328 match &job.state {
329 JobState::AwaitingRetry { retry_count, .. } => *retry_count,
330 JobState::Failed { retry_count, .. } => *retry_count,
331 _ => 0,
332 }
333 }
334
335 fn extract_previous_exception(&self, job: &Job) -> Option<String> {
337 match &job.state {
338 JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
339 JobState::Failed { exception, .. } => Some(exception.clone()),
340 _ => None,
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::processing::{RetryStrategy, Worker};
349 use crate::storage::MemoryStorage;
350 use async_trait::async_trait;
351 use std::sync::Arc;
352
353 struct TestWorker {
354 method: String,
355 should_succeed: bool,
356 should_retry: bool,
357 }
358
359 impl TestWorker {
360 fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
361 Self {
362 method: method.to_string(),
363 should_succeed,
364 should_retry,
365 }
366 }
367 }
368
369 #[async_trait]
370 impl Worker for TestWorker {
371 async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
372 if self.should_succeed {
373 Ok(WorkerResult::success(Some("Test result".to_string()), 100))
374 } else if self.should_retry {
375 Ok(WorkerResult::retry("Test error".to_string(), None))
376 } else {
377 Ok(WorkerResult::failure("Permanent failure".to_string()))
378 }
379 }
380
381 fn method_name(&self) -> &str {
382 &self.method
383 }
384 }
385
386 #[tokio::test]
387 async fn test_successful_job_processing() {
388 let storage = Arc::new(MemoryStorage::new());
389 let mut registry = WorkerRegistry::new();
390 registry.register(TestWorker::new("test_method", true, false));
391 let registry = Arc::new(registry);
392
393 let config = WorkerConfig::new("test-worker");
394 let processor = JobProcessor::new(registry, storage.clone(), config);
395
396 let job = Job::new("test_method", vec!["arg1".to_string()]);
397 let job_id = job.id.clone();
398
399 storage.enqueue(&job).await.unwrap();
401
402 processor.process_job(job).await.unwrap();
404
405 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
407 assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
408 }
409
410 #[tokio::test]
411 async fn test_job_retry() {
412 let storage = Arc::new(MemoryStorage::new());
413 let mut registry = WorkerRegistry::new();
414 registry.register(TestWorker::new("test_method", false, true));
415 let registry = Arc::new(registry);
416
417 let config = WorkerConfig::new("test-worker");
418 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
419 let processor =
420 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
421
422 let job = Job::new("test_method", vec!["arg1".to_string()]);
423 let job_id = job.id.clone();
424
425 storage.enqueue(&job).await.unwrap();
427
428 processor.process_job(job).await.unwrap();
430
431 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
433 assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
434 }
435
436 #[tokio::test]
437 async fn test_job_permanent_failure() {
438 let storage = Arc::new(MemoryStorage::new());
439 let mut registry = WorkerRegistry::new();
440 registry.register(TestWorker::new("test_method", false, false));
441 let registry = Arc::new(registry);
442
443 let config = WorkerConfig::new("test-worker");
444 let processor = JobProcessor::new(registry, storage.clone(), config);
445
446 let job = Job::new("test_method", vec!["arg1".to_string()]);
447 let job_id = job.id.clone();
448
449 storage.enqueue(&job).await.unwrap();
451
452 processor.process_job(job).await.unwrap();
454
455 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
457 assert!(matches!(updated_job.state, JobState::Failed { .. }));
458 }
459}