qml_rs/processing/
processor.rs

1//! Job processor for executing individual jobs
2//!
3//! This module contains the JobProcessor that handles the execution lifecycle
4//! of individual jobs, including state transitions and retry logic.
5
6use chrono::Utc;
7use std::sync::Arc;
8use tracing::{debug, error, info, warn};
9
10use super::{
11    WorkerConfig, WorkerRegistry, WorkerResult, retry::RetryPolicy, worker::WorkerContext,
12};
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::Storage;
16
17/// Job processor that executes jobs and manages their lifecycle
18pub struct JobProcessor {
19    worker_registry: Arc<WorkerRegistry>,
20    storage: Arc<dyn Storage>,
21    retry_policy: RetryPolicy,
22    worker_config: WorkerConfig,
23}
24
25impl JobProcessor {
26    /// Create a new job processor
27    pub fn new(
28        worker_registry: Arc<WorkerRegistry>,
29        storage: Arc<dyn Storage>,
30        worker_config: WorkerConfig,
31    ) -> Self {
32        Self {
33            worker_registry,
34            storage,
35            retry_policy: RetryPolicy::default(),
36            worker_config,
37        }
38    }
39
40    /// Create a new job processor with custom retry policy
41    pub fn with_retry_policy(
42        worker_registry: Arc<WorkerRegistry>,
43        storage: Arc<dyn Storage>,
44        worker_config: WorkerConfig,
45        retry_policy: RetryPolicy,
46    ) -> Self {
47        Self {
48            worker_registry,
49            storage,
50            retry_policy,
51            worker_config,
52        }
53    }
54
55    /// Get the worker ID for this processor
56    pub fn get_worker_id(&self) -> &str {
57        &self.worker_config.worker_id
58    }
59
60    /// Process a single job
61    pub async fn process_job(&self, mut job: Job) -> Result<()> {
62        let job_id = job.id.clone();
63        let method = job.method.clone();
64
65        info!("Starting job processing: {} ({})", job_id, method);
66
67        // Check if we have a worker for this job method
68        let worker = match self.worker_registry.get_worker(&method) {
69            Some(worker) => worker,
70            None => {
71                error!("No worker found for method: {}", method);
72                return self
73                    .fail_job_permanently(
74                        &mut job,
75                        format!("No worker registered for method: {}", method),
76                        None,
77                        0,
78                    )
79                    .await;
80            }
81        };
82
83        // Update job state to Processing (if not already)
84        if !matches!(job.state, JobState::Processing { .. }) {
85            let processing_state = JobState::processing(
86                &self.worker_config.worker_id,
87                &self.worker_config.server_name,
88            );
89
90            if let Err(e) = job.set_state(processing_state) {
91                error!("Failed to set job state to Processing: {}", e);
92                return Err(e);
93            }
94
95            // Save the updated state
96            if let Err(e) = self.storage.update(&job).await {
97                error!("Failed to update job state in storage: {}", e);
98                return Err(QmlError::StorageError {
99                    message: e.to_string(),
100                });
101            }
102        }
103
104        // Get retry count from job state
105        let retry_count = self.extract_retry_count(&job);
106
107        // Create worker context
108        let context = if retry_count > 0 {
109            let previous_exception = self.extract_previous_exception(&job);
110            WorkerContext::retry_from(
111                self.worker_config.clone(),
112                retry_count + 1,
113                previous_exception,
114            )
115        } else {
116            WorkerContext::new(self.worker_config.clone())
117        };
118
119        // Execute the job
120        let start_time = Utc::now();
121        let execution_result = match tokio::time::timeout(
122            self.worker_config.job_timeout.to_std().unwrap(),
123            worker.execute(&job, &context),
124        )
125        .await
126        {
127            Ok(result) => result,
128            Err(_) => {
129                warn!(
130                    "Job {} timed out after {:?}",
131                    job_id, self.worker_config.job_timeout
132                );
133                return self.handle_job_timeout(&mut job, retry_count).await;
134            }
135        };
136
137        let duration = (Utc::now() - start_time).num_milliseconds() as u64;
138
139        // Handle the execution result
140        match execution_result {
141            Ok(WorkerResult::Success {
142                result, metadata, ..
143            }) => {
144                info!("Job {} completed successfully in {}ms", job_id, duration);
145                self.complete_job_successfully(&mut job, result, duration, metadata)
146                    .await
147            }
148            Ok(WorkerResult::Retry {
149                error, retry_at, ..
150            }) => {
151                warn!("Job {} failed and will be retried: {}", job_id, error);
152                self.handle_job_retry(&mut job, error, retry_at, retry_count)
153                    .await
154            }
155            Ok(WorkerResult::Failure {
156                error, context: _, ..
157            }) => {
158                error!("Job {} failed permanently: {}", job_id, error);
159                self.fail_job_permanently(&mut job, error, None, retry_count)
160                    .await
161            }
162            Err(e) => {
163                error!("Job {} execution error: {}", job_id, e);
164                self.handle_execution_error(&mut job, e, retry_count).await
165            }
166        }
167    }
168
169    /// Complete a job successfully
170    async fn complete_job_successfully(
171        &self,
172        job: &mut Job,
173        result: Option<String>,
174        duration_ms: u64,
175        metadata: std::collections::HashMap<String, String>,
176    ) -> Result<()> {
177        // Check if job is already in a final state
178        if job.state.is_final() {
179            debug!(
180                "Job {} is already in a final state, skipping success",
181                job.id
182            );
183            return Ok(());
184        }
185
186        let succeeded_state = JobState::succeeded(duration_ms, result);
187
188        if let Err(e) = job.set_state(succeeded_state) {
189            error!("Failed to set job state to Succeeded: {}", e);
190            return Err(e);
191        }
192
193        // Add execution metadata
194        for (key, value) in metadata {
195            job.add_metadata(&format!("exec_{}", key), value);
196        }
197
198        // Update in storage
199        self.storage
200            .update(job)
201            .await
202            .map_err(|e| QmlError::StorageError {
203                message: e.to_string(),
204            })?;
205
206        Ok(())
207    }
208
209    /// Handle job retry
210    async fn handle_job_retry(
211        &self,
212        job: &mut Job,
213        error: String,
214        retry_at: Option<chrono::DateTime<Utc>>,
215        current_retry_count: u32,
216    ) -> Result<()> {
217        // Check if job is already in a final state
218        if job.state.is_final() {
219            debug!("Job {} is already in a final state, skipping retry", job.id);
220            return Ok(());
221        }
222
223        let next_attempt = current_retry_count + 1;
224
225        // Check if we should retry based on policy
226        if !self.retry_policy.should_retry(None, next_attempt) {
227            debug!(
228                "Retry limit exceeded for job {}, failing permanently",
229                job.id
230            );
231            return self
232                .fail_job_permanently(job, error, None, current_retry_count)
233                .await;
234        }
235
236        // First transition to Failed state
237        let failed_state = JobState::failed(error.clone(), None, current_retry_count);
238        if let Err(e) = job.set_state(failed_state) {
239            error!("Failed to set job state to Failed: {}", e);
240            return Err(e);
241        }
242
243        // Calculate retry time
244        let retry_time = retry_at
245            .or_else(|| self.retry_policy.calculate_retry_time(next_attempt))
246            .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
247
248        // Then transition to AwaitingRetry
249        let retry_state = JobState::awaiting_retry(retry_time, next_attempt, &error);
250
251        if let Err(e) = job.set_state(retry_state) {
252            error!("Failed to set job state to AwaitingRetry: {}", e);
253            return Err(e);
254        }
255
256        // Update in storage
257        self.storage
258            .update(job)
259            .await
260            .map_err(|e| QmlError::StorageError {
261                message: e.to_string(),
262            })?;
263
264        info!(
265            "Job {} scheduled for retry #{} at {}",
266            job.id, next_attempt, retry_time
267        );
268        Ok(())
269    }
270
271    /// Fail a job permanently
272    async fn fail_job_permanently(
273        &self,
274        job: &mut Job,
275        error: String,
276        stack_trace: Option<String>,
277        retry_count: u32,
278    ) -> Result<()> {
279        // Check if job is already in a final state
280        if job.state.is_final() {
281            debug!(
282                "Job {} is already in a final state, skipping failure",
283                job.id
284            );
285            return Ok(());
286        }
287
288        let failed_state = JobState::failed(error, stack_trace, retry_count);
289
290        if let Err(e) = job.set_state(failed_state) {
291            error!("Failed to set job state to Failed: {}", e);
292            return Err(e);
293        }
294
295        // Update in storage
296        self.storage
297            .update(job)
298            .await
299            .map_err(|e| QmlError::StorageError {
300                message: e.to_string(),
301            })?;
302
303        error!(
304            "Job {} failed permanently after {} attempts",
305            job.id,
306            retry_count + 1
307        );
308        Ok(())
309    }
310
311    /// Handle job timeout
312    async fn handle_job_timeout(&self, job: &mut Job, retry_count: u32) -> Result<()> {
313        let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
314
315        let next_attempt = retry_count + 1;
316        if self
317            .retry_policy
318            .should_retry(Some("TimeoutError"), next_attempt)
319        {
320            self.handle_job_retry(job, timeout_error, None, retry_count)
321                .await
322        } else {
323            self.fail_job_permanently(job, timeout_error, None, retry_count)
324                .await
325        }
326    }
327
328    /// Handle execution errors
329    async fn handle_execution_error(
330        &self,
331        job: &mut Job,
332        error: QmlError,
333        retry_count: u32,
334    ) -> Result<()> {
335        let error_type = match &error {
336            QmlError::StorageError { .. } => "StorageError",
337            QmlError::WorkerError { .. } => "WorkerError",
338            QmlError::TimeoutError { .. } => "TimeoutError",
339            _ => "UnknownError",
340        };
341
342        let error_message = error.to_string();
343        let next_attempt = retry_count + 1;
344
345        if self
346            .retry_policy
347            .should_retry(Some(error_type), next_attempt)
348        {
349            self.handle_job_retry(job, error_message, None, retry_count)
350                .await
351        } else {
352            self.fail_job_permanently(job, error_message, None, retry_count)
353                .await
354        }
355    }
356
357    /// Extract retry count from job state
358    fn extract_retry_count(&self, job: &Job) -> u32 {
359        match &job.state {
360            JobState::AwaitingRetry { retry_count, .. } => *retry_count,
361            JobState::Failed { retry_count, .. } => *retry_count,
362            _ => 0,
363        }
364    }
365
366    /// Extract previous exception from job state
367    fn extract_previous_exception(&self, job: &Job) -> Option<String> {
368        match &job.state {
369            JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
370            JobState::Failed { exception, .. } => Some(exception.clone()),
371            _ => None,
372        }
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use crate::processing::{RetryStrategy, Worker};
380    use crate::storage::MemoryStorage;
381    use async_trait::async_trait;
382    use std::sync::Arc;
383
384    struct TestWorker {
385        method: String,
386        should_succeed: bool,
387        should_retry: bool,
388    }
389
390    impl TestWorker {
391        fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
392            Self {
393                method: method.to_string(),
394                should_succeed,
395                should_retry,
396            }
397        }
398    }
399
400    #[async_trait]
401    impl Worker for TestWorker {
402        async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
403            if self.should_succeed {
404                Ok(WorkerResult::success(Some("Test result".to_string()), 100))
405            } else if self.should_retry {
406                Ok(WorkerResult::retry("Test error".to_string(), None))
407            } else {
408                Ok(WorkerResult::failure("Permanent failure".to_string()))
409            }
410        }
411
412        fn method_name(&self) -> &str {
413            &self.method
414        }
415    }
416
417    #[tokio::test]
418    async fn test_successful_job_processing() {
419        let storage = Arc::new(MemoryStorage::new());
420        let mut registry = WorkerRegistry::new();
421        registry.register(TestWorker::new("test_method", true, false));
422        let registry = Arc::new(registry);
423
424        let config = WorkerConfig::new("test-worker");
425        let processor = JobProcessor::new(registry, storage.clone(), config);
426
427        let job = Job::new("test_method", vec!["arg1".to_string()]);
428        let job_id = job.id.clone();
429
430        // Store the job first
431        storage.enqueue(&job).await.unwrap();
432
433        // Process the job
434        processor.process_job(job).await.unwrap();
435
436        // Check that the job is marked as succeeded
437        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
438        assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
439    }
440
441    #[tokio::test]
442    async fn test_job_retry() {
443        let storage = Arc::new(MemoryStorage::new());
444        let mut registry = WorkerRegistry::new();
445        registry.register(TestWorker::new("test_method", false, true));
446        let registry = Arc::new(registry);
447
448        let config = WorkerConfig::new("test-worker");
449        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
450        let processor =
451            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
452
453        let job = Job::new("test_method", vec!["arg1".to_string()]);
454        let job_id = job.id.clone();
455
456        // Store the job first
457        storage.enqueue(&job).await.unwrap();
458
459        // Process the job
460        processor.process_job(job).await.unwrap();
461
462        // Check that the job is awaiting retry
463        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
464        assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
465    }
466
467    #[tokio::test]
468    async fn test_job_permanent_failure() {
469        let storage = Arc::new(MemoryStorage::new());
470        let mut registry = WorkerRegistry::new();
471        registry.register(TestWorker::new("test_method", false, false));
472        let registry = Arc::new(registry);
473
474        let config = WorkerConfig::new("test-worker");
475        let processor = JobProcessor::new(registry, storage.clone(), config);
476
477        let job = Job::new("test_method", vec!["arg1".to_string()]);
478        let job_id = job.id.clone();
479
480        // Store the job first
481        storage.enqueue(&job).await.unwrap();
482
483        // Process the job
484        processor.process_job(job).await.unwrap();
485
486        // Check that the job failed permanently
487        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
488        assert!(matches!(updated_job.state, JobState::Failed { .. }));
489    }
490}