qml_rs/processing/
processor.rs

1//! Job processor for executing individual jobs
2//!
3//! This module contains the JobProcessor that handles the execution lifecycle
4//! of individual jobs, including state transitions and retry logic.
5
6use chrono::Utc;
7use std::sync::Arc;
8use tracing::{debug, error, info, warn};
9
10use super::{
11    WorkerConfig, WorkerRegistry, WorkerResult, retry::RetryPolicy, worker::WorkerContext,
12};
13use crate::core::{Job, JobState};
14use crate::error::{QmlError, Result};
15use crate::storage::Storage;
16
17/// Job processor that executes jobs and manages their lifecycle
18pub struct JobProcessor {
19    worker_registry: Arc<WorkerRegistry>,
20    storage: Arc<dyn Storage>,
21    retry_policy: RetryPolicy,
22    worker_config: WorkerConfig,
23}
24
25impl JobProcessor {
26    /// Create a new job processor
27    pub fn new(
28        worker_registry: Arc<WorkerRegistry>,
29        storage: Arc<dyn Storage>,
30        worker_config: WorkerConfig,
31    ) -> Self {
32        Self {
33            worker_registry,
34            storage,
35            retry_policy: RetryPolicy::default(),
36            worker_config,
37        }
38    }
39
40    /// Create a new job processor with custom retry policy
41    pub fn with_retry_policy(
42        worker_registry: Arc<WorkerRegistry>,
43        storage: Arc<dyn Storage>,
44        worker_config: WorkerConfig,
45        retry_policy: RetryPolicy,
46    ) -> Self {
47        Self {
48            worker_registry,
49            storage,
50            retry_policy,
51            worker_config,
52        }
53    }
54
55    /// Process a single job
56    pub async fn process_job(&self, mut job: Job) -> Result<()> {
57        let job_id = job.id.clone();
58        let method = job.method.clone();
59
60        info!("Starting job processing: {} ({})", job_id, method);
61
62        // Check if we have a worker for this job method
63        let worker = match self.worker_registry.get_worker(&method) {
64            Some(worker) => worker,
65            None => {
66                error!("No worker found for method: {}", method);
67                return self
68                    .fail_job_permanently(
69                        &mut job,
70                        format!("No worker registered for method: {}", method),
71                        None,
72                        0,
73                    )
74                    .await;
75            }
76        };
77
78        // Update job state to Processing
79        let processing_state = JobState::processing(
80            &self.worker_config.worker_id,
81            &self.worker_config.server_name,
82        );
83
84        if let Err(e) = job.set_state(processing_state) {
85            error!("Failed to set job state to Processing: {}", e);
86            return Err(e);
87        }
88
89        // Save the updated state
90        if let Err(e) = self.storage.update(&job).await {
91            error!("Failed to update job state in storage: {}", e);
92            return Err(QmlError::StorageError {
93                message: e.to_string(),
94            });
95        }
96
97        // Get retry count from job state
98        let retry_count = self.extract_retry_count(&job);
99
100        // Create worker context
101        let context = if retry_count > 0 {
102            let previous_exception = self.extract_previous_exception(&job);
103            WorkerContext::retry_from(
104                self.worker_config.clone(),
105                retry_count + 1,
106                previous_exception,
107            )
108        } else {
109            WorkerContext::new(self.worker_config.clone())
110        };
111
112        // Execute the job
113        let start_time = Utc::now();
114        let execution_result = match tokio::time::timeout(
115            self.worker_config.job_timeout.to_std().unwrap(),
116            worker.execute(&job, &context),
117        )
118        .await
119        {
120            Ok(result) => result,
121            Err(_) => {
122                warn!(
123                    "Job {} timed out after {:?}",
124                    job_id, self.worker_config.job_timeout
125                );
126                return self.handle_job_timeout(&mut job, retry_count).await;
127            }
128        };
129
130        let duration = (Utc::now() - start_time).num_milliseconds() as u64;
131
132        // Handle the execution result
133        match execution_result {
134            Ok(WorkerResult::Success {
135                result, metadata, ..
136            }) => {
137                info!("Job {} completed successfully in {}ms", job_id, duration);
138                self.complete_job_successfully(&mut job, result, duration, metadata)
139                    .await
140            }
141            Ok(WorkerResult::Retry {
142                error, retry_at, ..
143            }) => {
144                warn!("Job {} failed and will be retried: {}", job_id, error);
145                self.handle_job_retry(&mut job, error, retry_at, retry_count)
146                    .await
147            }
148            Ok(WorkerResult::Failure {
149                error, context: _, ..
150            }) => {
151                error!("Job {} failed permanently: {}", job_id, error);
152                self.fail_job_permanently(&mut job, error, None, retry_count)
153                    .await
154            }
155            Err(e) => {
156                error!("Job {} execution error: {}", job_id, e);
157                self.handle_execution_error(&mut job, e, retry_count).await
158            }
159        }
160    }
161
162    /// Complete a job successfully
163    async fn complete_job_successfully(
164        &self,
165        job: &mut Job,
166        result: Option<String>,
167        duration_ms: u64,
168        metadata: std::collections::HashMap<String, String>,
169    ) -> Result<()> {
170        let succeeded_state = JobState::succeeded(duration_ms, result);
171
172        if let Err(e) = job.set_state(succeeded_state) {
173            error!("Failed to set job state to Succeeded: {}", e);
174            return Err(e);
175        }
176
177        // Add execution metadata
178        for (key, value) in metadata {
179            job.add_metadata(&format!("exec_{}", key), value);
180        }
181
182        // Update in storage
183        self.storage
184            .update(job)
185            .await
186            .map_err(|e| QmlError::StorageError {
187                message: e.to_string(),
188            })?;
189
190        Ok(())
191    }
192
193    /// Handle job retry
194    async fn handle_job_retry(
195        &self,
196        job: &mut Job,
197        error: String,
198        retry_at: Option<chrono::DateTime<Utc>>,
199        current_retry_count: u32,
200    ) -> Result<()> {
201        let next_attempt = current_retry_count + 1;
202
203        // Check if we should retry based on policy
204        if !self.retry_policy.should_retry(None, next_attempt) {
205            debug!(
206                "Retry limit exceeded for job {}, failing permanently",
207                job.id
208            );
209            return self
210                .fail_job_permanently(job, error, None, current_retry_count)
211                .await;
212        }
213
214        // First transition to Failed state
215        let failed_state = JobState::failed(error.clone(), None, current_retry_count);
216        if let Err(e) = job.set_state(failed_state) {
217            error!("Failed to set job state to Failed: {}", e);
218            return Err(e);
219        }
220
221        // Calculate retry time
222        let retry_time = retry_at
223            .or_else(|| self.retry_policy.calculate_retry_time(next_attempt))
224            .unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
225
226        // Then transition to AwaitingRetry
227        let retry_state = JobState::awaiting_retry(retry_time, next_attempt, &error);
228
229        if let Err(e) = job.set_state(retry_state) {
230            error!("Failed to set job state to AwaitingRetry: {}", e);
231            return Err(e);
232        }
233
234        // Update in storage
235        self.storage
236            .update(job)
237            .await
238            .map_err(|e| QmlError::StorageError {
239                message: e.to_string(),
240            })?;
241
242        info!(
243            "Job {} scheduled for retry #{} at {}",
244            job.id, next_attempt, retry_time
245        );
246        Ok(())
247    }
248
249    /// Fail a job permanently
250    async fn fail_job_permanently(
251        &self,
252        job: &mut Job,
253        error: String,
254        stack_trace: Option<String>,
255        retry_count: u32,
256    ) -> Result<()> {
257        let failed_state = JobState::failed(error, stack_trace, retry_count);
258
259        if let Err(e) = job.set_state(failed_state) {
260            error!("Failed to set job state to Failed: {}", e);
261            return Err(e);
262        }
263
264        // Update in storage
265        self.storage
266            .update(job)
267            .await
268            .map_err(|e| QmlError::StorageError {
269                message: e.to_string(),
270            })?;
271
272        error!(
273            "Job {} failed permanently after {} attempts",
274            job.id,
275            retry_count + 1
276        );
277        Ok(())
278    }
279
280    /// Handle job timeout
281    async fn handle_job_timeout(&self, job: &mut Job, retry_count: u32) -> Result<()> {
282        let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
283
284        let next_attempt = retry_count + 1;
285        if self
286            .retry_policy
287            .should_retry(Some("TimeoutError"), next_attempt)
288        {
289            self.handle_job_retry(job, timeout_error, None, retry_count)
290                .await
291        } else {
292            self.fail_job_permanently(job, timeout_error, None, retry_count)
293                .await
294        }
295    }
296
297    /// Handle execution errors
298    async fn handle_execution_error(
299        &self,
300        job: &mut Job,
301        error: QmlError,
302        retry_count: u32,
303    ) -> Result<()> {
304        let error_type = match &error {
305            QmlError::StorageError { .. } => "StorageError",
306            QmlError::WorkerError { .. } => "WorkerError",
307            QmlError::TimeoutError { .. } => "TimeoutError",
308            _ => "UnknownError",
309        };
310
311        let error_message = error.to_string();
312        let next_attempt = retry_count + 1;
313
314        if self
315            .retry_policy
316            .should_retry(Some(error_type), next_attempt)
317        {
318            self.handle_job_retry(job, error_message, None, retry_count)
319                .await
320        } else {
321            self.fail_job_permanently(job, error_message, None, retry_count)
322                .await
323        }
324    }
325
326    /// Extract retry count from job state
327    fn extract_retry_count(&self, job: &Job) -> u32 {
328        match &job.state {
329            JobState::AwaitingRetry { retry_count, .. } => *retry_count,
330            JobState::Failed { retry_count, .. } => *retry_count,
331            _ => 0,
332        }
333    }
334
335    /// Extract previous exception from job state
336    fn extract_previous_exception(&self, job: &Job) -> Option<String> {
337        match &job.state {
338            JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
339            JobState::Failed { exception, .. } => Some(exception.clone()),
340            _ => None,
341        }
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::processing::{RetryStrategy, Worker};
349    use crate::storage::MemoryStorage;
350    use async_trait::async_trait;
351    use std::sync::Arc;
352
353    struct TestWorker {
354        method: String,
355        should_succeed: bool,
356        should_retry: bool,
357    }
358
359    impl TestWorker {
360        fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
361            Self {
362                method: method.to_string(),
363                should_succeed,
364                should_retry,
365            }
366        }
367    }
368
369    #[async_trait]
370    impl Worker for TestWorker {
371        async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
372            if self.should_succeed {
373                Ok(WorkerResult::success(Some("Test result".to_string()), 100))
374            } else if self.should_retry {
375                Ok(WorkerResult::retry("Test error".to_string(), None))
376            } else {
377                Ok(WorkerResult::failure("Permanent failure".to_string()))
378            }
379        }
380
381        fn method_name(&self) -> &str {
382            &self.method
383        }
384    }
385
386    #[tokio::test]
387    async fn test_successful_job_processing() {
388        let storage = Arc::new(MemoryStorage::new());
389        let mut registry = WorkerRegistry::new();
390        registry.register(TestWorker::new("test_method", true, false));
391        let registry = Arc::new(registry);
392
393        let config = WorkerConfig::new("test-worker");
394        let processor = JobProcessor::new(registry, storage.clone(), config);
395
396        let job = Job::new("test_method", vec!["arg1".to_string()]);
397        let job_id = job.id.clone();
398
399        // Store the job first
400        storage.enqueue(&job).await.unwrap();
401
402        // Process the job
403        processor.process_job(job).await.unwrap();
404
405        // Check that the job is marked as succeeded
406        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
407        assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
408    }
409
410    #[tokio::test]
411    async fn test_job_retry() {
412        let storage = Arc::new(MemoryStorage::new());
413        let mut registry = WorkerRegistry::new();
414        registry.register(TestWorker::new("test_method", false, true));
415        let registry = Arc::new(registry);
416
417        let config = WorkerConfig::new("test-worker");
418        let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
419        let processor =
420            JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
421
422        let job = Job::new("test_method", vec!["arg1".to_string()]);
423        let job_id = job.id.clone();
424
425        // Store the job first
426        storage.enqueue(&job).await.unwrap();
427
428        // Process the job
429        processor.process_job(job).await.unwrap();
430
431        // Check that the job is awaiting retry
432        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
433        assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
434    }
435
436    #[tokio::test]
437    async fn test_job_permanent_failure() {
438        let storage = Arc::new(MemoryStorage::new());
439        let mut registry = WorkerRegistry::new();
440        registry.register(TestWorker::new("test_method", false, false));
441        let registry = Arc::new(registry);
442
443        let config = WorkerConfig::new("test-worker");
444        let processor = JobProcessor::new(registry, storage.clone(), config);
445
446        let job = Job::new("test_method", vec!["arg1".to_string()]);
447        let job_id = job.id.clone();
448
449        // Store the job first
450        storage.enqueue(&job).await.unwrap();
451
452        // Process the job
453        processor.process_job(job).await.unwrap();
454
455        // Check that the job failed permanently
456        let updated_job = storage.get(&job_id).await.unwrap().unwrap();
457        assert!(matches!(updated_job.state, JobState::Failed { .. }));
458    }
459}