1use chrono::Utc;
7use std::sync::Arc;
8use tracing::{debug, error, info, warn};
9
10use super::{
11 WorkerConfig, WorkerRegistry, WorkerResult, retry::RetryPolicy, worker::WorkerContext,
12};
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::Storage;
16
17pub struct JobProcessor {
19 worker_registry: Arc<WorkerRegistry>,
20 storage: Arc<dyn Storage>,
21 retry_policy: RetryPolicy,
22 worker_config: WorkerConfig,
23}
24
25impl JobProcessor {
26 pub fn new(
28 worker_registry: Arc<WorkerRegistry>,
29 storage: Arc<dyn Storage>,
30 worker_config: WorkerConfig,
31 ) -> Self {
32 Self {
33 worker_registry,
34 storage,
35 retry_policy: RetryPolicy::default(),
36 worker_config,
37 }
38 }
39
40 pub fn with_retry_policy(
42 worker_registry: Arc<WorkerRegistry>,
43 storage: Arc<dyn Storage>,
44 worker_config: WorkerConfig,
45 retry_policy: RetryPolicy,
46 ) -> Self {
47 Self {
48 worker_registry,
49 storage,
50 retry_policy,
51 worker_config,
52 }
53 }
54
55 pub fn get_worker_id(&self) -> &str {
57 &self.worker_config.worker_id
58 }
59
60 pub async fn process_job(&self, mut job: Job) -> Result<()> {
62 let job_id = job.id.clone();
63 let method = job.method.clone();
64
65 info!("Starting job processing: {} ({})", job_id, method);
66
67 let worker = match self.worker_registry.get_worker(&method) {
69 Some(worker) => worker,
70 None => {
71 error!("No worker found for method: {}", method);
72 return self
73 .fail_job_permanently(
74 &mut job,
75 format!("No worker registered for method: {}", method),
76 None,
77 0,
78 )
79 .await;
80 }
81 };
82
83 if !matches!(job.state, JobState::Processing { .. }) {
85 let processing_state = JobState::processing(
86 &self.worker_config.worker_id,
87 &self.worker_config.server_name,
88 );
89
90 if let Err(e) = job.set_state(processing_state) {
91 error!("Failed to set job state to Processing: {}", e);
92 return Err(e);
93 }
94
95 if let Err(e) = self.storage.update(&job).await {
97 error!("Failed to update job state in storage: {}", e);
98 return Err(QmlError::StorageError {
99 message: e.to_string(),
100 });
101 }
102 }
103
104 let retry_count = self.extract_retry_count(&job);
106
107 let context = if retry_count > 0 {
109 let previous_exception = self.extract_previous_exception(&job);
110 WorkerContext::retry_from(
111 self.worker_config.clone(),
112 retry_count + 1,
113 previous_exception,
114 )
115 } else {
116 WorkerContext::new(self.worker_config.clone())
117 };
118
119 let start_time = Utc::now();
121 let execution_result = match tokio::time::timeout(
122 self.worker_config.job_timeout.to_std().unwrap(),
123 worker.execute(&job, &context),
124 )
125 .await
126 {
127 Ok(result) => result,
128 Err(_) => {
129 warn!(
130 "Job {} timed out after {:?}",
131 job_id, self.worker_config.job_timeout
132 );
133 return self.handle_job_timeout(&mut job, retry_count).await;
134 }
135 };
136
137 let duration = (Utc::now() - start_time).num_milliseconds() as u64;
138
139 match execution_result {
141 Ok(WorkerResult::Success {
142 result, metadata, ..
143 }) => {
144 info!("Job {} completed successfully in {}ms", job_id, duration);
145 self.complete_job_successfully(&mut job, result, duration, metadata)
146 .await
147 }
148 Ok(WorkerResult::Retry {
149 error, retry_at, ..
150 }) => {
151 warn!("Job {} failed and will be retried: {}", job_id, error);
152 self.handle_job_retry(&mut job, error, retry_at, retry_count)
153 .await
154 }
155 Ok(WorkerResult::Failure {
156 error, context: _, ..
157 }) => {
158 error!("Job {} failed permanently: {}", job_id, error);
159 self.fail_job_permanently(&mut job, error, None, retry_count)
160 .await
161 }
162 Err(e) => {
163 error!("Job {} execution error: {}", job_id, e);
164 self.handle_execution_error(&mut job, e, retry_count).await
165 }
166 }
167 }
168
169 async fn complete_job_successfully(
171 &self,
172 job: &mut Job,
173 result: Option<String>,
174 duration_ms: u64,
175 metadata: std::collections::HashMap<String, String>,
176 ) -> Result<()> {
177 if job.state.is_final() {
179 debug!(
180 "Job {} is already in a final state, skipping success",
181 job.id
182 );
183 return Ok(());
184 }
185
186 let succeeded_state = JobState::succeeded(duration_ms, result);
187
188 if let Err(e) = job.set_state(succeeded_state) {
189 error!("Failed to set job state to Succeeded: {}", e);
190 return Err(e);
191 }
192
193 for (key, value) in metadata {
195 job.add_metadata(&format!("exec_{}", key), value);
196 }
197
198 self.storage
200 .update(job)
201 .await
202 .map_err(|e| QmlError::StorageError {
203 message: e.to_string(),
204 })?;
205
206 Ok(())
207 }
208
209 async fn handle_job_retry(
211 &self,
212 job: &mut Job,
213 error: String,
214 retry_at: Option<chrono::DateTime<Utc>>,
215 current_retry_count: u32,
216 ) -> Result<()> {
217 if job.state.is_final() {
219 debug!("Job {} is already in a final state, skipping retry", job.id);
220 return Ok(());
221 }
222
223 let next_attempt = current_retry_count + 1;
224
225 if !self.retry_policy.should_retry(None, next_attempt) {
227 debug!(
228 "Retry limit exceeded for job {}, failing permanently",
229 job.id
230 );
231 return self
232 .fail_job_permanently(job, error, None, current_retry_count)
233 .await;
234 }
235
236 let failed_state = JobState::failed(error.clone(), None, current_retry_count);
238 if let Err(e) = job.set_state(failed_state) {
239 error!("Failed to set job state to Failed: {}", e);
240 return Err(e);
241 }
242
243 let retry_time = retry_at
245 .or_else(|| self.retry_policy.calculate_retry_time(next_attempt))
246 .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
247
248 let retry_state = JobState::awaiting_retry(retry_time, next_attempt, &error);
250
251 if let Err(e) = job.set_state(retry_state) {
252 error!("Failed to set job state to AwaitingRetry: {}", e);
253 return Err(e);
254 }
255
256 self.storage
258 .update(job)
259 .await
260 .map_err(|e| QmlError::StorageError {
261 message: e.to_string(),
262 })?;
263
264 info!(
265 "Job {} scheduled for retry #{} at {}",
266 job.id, next_attempt, retry_time
267 );
268 Ok(())
269 }
270
271 async fn fail_job_permanently(
273 &self,
274 job: &mut Job,
275 error: String,
276 stack_trace: Option<String>,
277 retry_count: u32,
278 ) -> Result<()> {
279 if job.state.is_final() {
281 debug!(
282 "Job {} is already in a final state, skipping failure",
283 job.id
284 );
285 return Ok(());
286 }
287
288 let failed_state = JobState::failed(error, stack_trace, retry_count);
289
290 if let Err(e) = job.set_state(failed_state) {
291 error!("Failed to set job state to Failed: {}", e);
292 return Err(e);
293 }
294
295 self.storage
297 .update(job)
298 .await
299 .map_err(|e| QmlError::StorageError {
300 message: e.to_string(),
301 })?;
302
303 error!(
304 "Job {} failed permanently after {} attempts",
305 job.id,
306 retry_count + 1
307 );
308 Ok(())
309 }
310
311 async fn handle_job_timeout(&self, job: &mut Job, retry_count: u32) -> Result<()> {
313 let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
314
315 let next_attempt = retry_count + 1;
316 if self
317 .retry_policy
318 .should_retry(Some("TimeoutError"), next_attempt)
319 {
320 self.handle_job_retry(job, timeout_error, None, retry_count)
321 .await
322 } else {
323 self.fail_job_permanently(job, timeout_error, None, retry_count)
324 .await
325 }
326 }
327
328 async fn handle_execution_error(
330 &self,
331 job: &mut Job,
332 error: QmlError,
333 retry_count: u32,
334 ) -> Result<()> {
335 let error_type = match &error {
336 QmlError::StorageError { .. } => "StorageError",
337 QmlError::WorkerError { .. } => "WorkerError",
338 QmlError::TimeoutError { .. } => "TimeoutError",
339 _ => "UnknownError",
340 };
341
342 let error_message = error.to_string();
343 let next_attempt = retry_count + 1;
344
345 if self
346 .retry_policy
347 .should_retry(Some(error_type), next_attempt)
348 {
349 self.handle_job_retry(job, error_message, None, retry_count)
350 .await
351 } else {
352 self.fail_job_permanently(job, error_message, None, retry_count)
353 .await
354 }
355 }
356
357 fn extract_retry_count(&self, job: &Job) -> u32 {
359 match &job.state {
360 JobState::AwaitingRetry { retry_count, .. } => *retry_count,
361 JobState::Failed { retry_count, .. } => *retry_count,
362 _ => 0,
363 }
364 }
365
366 fn extract_previous_exception(&self, job: &Job) -> Option<String> {
368 match &job.state {
369 JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
370 JobState::Failed { exception, .. } => Some(exception.clone()),
371 _ => None,
372 }
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use crate::processing::{RetryStrategy, Worker};
380 use crate::storage::MemoryStorage;
381 use async_trait::async_trait;
382 use std::sync::Arc;
383
384 struct TestWorker {
385 method: String,
386 should_succeed: bool,
387 should_retry: bool,
388 }
389
390 impl TestWorker {
391 fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
392 Self {
393 method: method.to_string(),
394 should_succeed,
395 should_retry,
396 }
397 }
398 }
399
400 #[async_trait]
401 impl Worker for TestWorker {
402 async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
403 if self.should_succeed {
404 Ok(WorkerResult::success(Some("Test result".to_string()), 100))
405 } else if self.should_retry {
406 Ok(WorkerResult::retry("Test error".to_string(), None))
407 } else {
408 Ok(WorkerResult::failure("Permanent failure".to_string()))
409 }
410 }
411
412 fn method_name(&self) -> &str {
413 &self.method
414 }
415 }
416
417 #[tokio::test]
418 async fn test_successful_job_processing() {
419 let storage = Arc::new(MemoryStorage::new());
420 let mut registry = WorkerRegistry::new();
421 registry.register(TestWorker::new("test_method", true, false));
422 let registry = Arc::new(registry);
423
424 let config = WorkerConfig::new("test-worker");
425 let processor = JobProcessor::new(registry, storage.clone(), config);
426
427 let job = Job::new("test_method", vec!["arg1".to_string()]);
428 let job_id = job.id.clone();
429
430 storage.enqueue(&job).await.unwrap();
432
433 processor.process_job(job).await.unwrap();
435
436 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
438 assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
439 }
440
441 #[tokio::test]
442 async fn test_job_retry() {
443 let storage = Arc::new(MemoryStorage::new());
444 let mut registry = WorkerRegistry::new();
445 registry.register(TestWorker::new("test_method", false, true));
446 let registry = Arc::new(registry);
447
448 let config = WorkerConfig::new("test-worker");
449 let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
450 let processor =
451 JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
452
453 let job = Job::new("test_method", vec!["arg1".to_string()]);
454 let job_id = job.id.clone();
455
456 storage.enqueue(&job).await.unwrap();
458
459 processor.process_job(job).await.unwrap();
461
462 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
464 assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
465 }
466
467 #[tokio::test]
468 async fn test_job_permanent_failure() {
469 let storage = Arc::new(MemoryStorage::new());
470 let mut registry = WorkerRegistry::new();
471 registry.register(TestWorker::new("test_method", false, false));
472 let registry = Arc::new(registry);
473
474 let config = WorkerConfig::new("test-worker");
475 let processor = JobProcessor::new(registry, storage.clone(), config);
476
477 let job = Job::new("test_method", vec!["arg1".to_string()]);
478 let job_id = job.id.clone();
479
480 storage.enqueue(&job).await.unwrap();
482
483 processor.process_job(job).await.unwrap();
485
486 let updated_job = storage.get(&job_id).await.unwrap().unwrap();
488 assert!(matches!(updated_job.state, JobState::Failed { .. }));
489 }
490}