use chrono::{Duration, Utc};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use super::{
WorkerConfig, WorkerRegistry, WorkerResult,
cleanup::{DEFAULT_FAILED_TTL, DEFAULT_SUCCEEDED_TTL},
middleware::{self, JobMiddleware, TracingMiddleware},
retry::RetryPolicy,
worker::WorkerContext,
};
use crate::core::{Job, JobState};
use crate::error::{QmlError, Result};
use crate::storage::Storage;
fn default_middleware() -> Vec<Arc<dyn JobMiddleware>> {
vec![Arc::new(TracingMiddleware)]
}
pub type StateChangeHook = Arc<dyn Fn(&Job, &JobState, &JobState) + Send + Sync>;
pub struct JobProcessor {
worker_registry: Arc<WorkerRegistry>,
storage: Arc<dyn Storage>,
retry_policy: RetryPolicy,
worker_config: WorkerConfig,
cancel_token: CancellationToken,
succeeded_ttl: Duration,
failed_ttl: Duration,
middleware: Vec<Arc<dyn JobMiddleware>>,
on_state_change: Option<StateChangeHook>,
}
impl JobProcessor {
pub fn new(
worker_registry: Arc<WorkerRegistry>,
storage: Arc<dyn Storage>,
worker_config: WorkerConfig,
) -> Self {
Self {
worker_registry,
storage,
retry_policy: RetryPolicy::default(),
worker_config,
cancel_token: CancellationToken::new(),
succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
failed_ttl: DEFAULT_FAILED_TTL,
middleware: default_middleware(),
on_state_change: None,
}
}
pub fn with_retry_policy(
worker_registry: Arc<WorkerRegistry>,
storage: Arc<dyn Storage>,
worker_config: WorkerConfig,
retry_policy: RetryPolicy,
) -> Self {
Self {
worker_registry,
storage,
retry_policy,
worker_config,
cancel_token: CancellationToken::new(),
succeeded_ttl: DEFAULT_SUCCEEDED_TTL,
failed_ttl: DEFAULT_FAILED_TTL,
middleware: default_middleware(),
on_state_change: None,
}
}
pub fn with_cancellation(mut self, cancel_token: CancellationToken) -> Self {
self.cancel_token = cancel_token;
self
}
pub fn with_ttls(mut self, succeeded_ttl: Duration, failed_ttl: Duration) -> Self {
self.succeeded_ttl = succeeded_ttl;
self.failed_ttl = failed_ttl;
self
}
pub fn with_middleware(mut self, middleware: Vec<Arc<dyn JobMiddleware>>) -> Self {
self.middleware = middleware;
self
}
pub fn with_state_change_hook(mut self, hook: StateChangeHook) -> Self {
self.on_state_change = Some(hook);
self
}
fn apply_state_change(&self, job: &mut Job, new_state: JobState) -> Result<()> {
let prev_state = job.state.clone();
job.set_state(new_state)?;
self.fire_state_change_hook(job, &prev_state);
Ok(())
}
fn fire_state_change_hook(&self, job: &Job, prev_state: &JobState) {
if let Some(hook) = &self.on_state_change {
hook(job, prev_state, &job.state);
}
}
pub fn get_worker_id(&self) -> &str {
&self.worker_config.worker_id
}
pub async fn process_job(&self, mut job: Job) -> Result<()> {
let job_id = job.id.clone();
let method = job.method.clone();
info!("Starting job processing: {} ({})", job_id, method);
job.attempt = job.attempt.saturating_add(1);
let worker = match self.worker_registry.get_worker(&method) {
Some(worker) => worker,
None => {
error!("No worker found for method: {}", method);
return self
.fail_job_permanently(
&mut job,
format!("No worker registered for method: {}", method),
None,
)
.await;
}
};
if !matches!(job.state, JobState::Processing { .. }) {
let processing_state = JobState::processing(
&self.worker_config.worker_id,
&self.worker_config.server_name,
);
if let Err(e) = self.apply_state_change(&mut job, processing_state) {
error!("Failed to set job state to Processing: {}", e);
return Err(e);
}
if let Err(e) = self.storage.update(&job).await {
error!("Failed to update job state in storage: {}", e);
return Err(e.into());
}
}
let context = if job.attempt > 1 {
let previous_exception = self.extract_previous_exception(&job);
WorkerContext::retry_from(self.worker_config.clone(), job.attempt, previous_exception)
} else {
WorkerContext::new(self.worker_config.clone())
}
.with_cancel(self.cancel_token.clone());
let start_time = Utc::now();
let execution_result = match tokio::time::timeout(
self.worker_config.job_timeout.to_std().unwrap(),
middleware::run_stack(&self.middleware, worker, &job, &context),
)
.await
{
Ok(result) => result,
Err(_) => {
warn!(
"Job {} timed out after {:?}",
job_id, self.worker_config.job_timeout
);
return self.handle_job_timeout(&mut job).await;
}
};
let duration = (Utc::now() - start_time).num_milliseconds() as u64;
match execution_result {
Ok(WorkerResult::Success {
result, metadata, ..
}) => {
info!("Job {} completed successfully in {}ms", job_id, duration);
self.complete_job_successfully(&mut job, result, duration, metadata)
.await
}
Ok(WorkerResult::Retry {
error, retry_at, ..
}) => {
warn!("Job {} failed and will be retried: {}", job_id, error);
self.handle_job_retry(&mut job, error, retry_at).await
}
Ok(WorkerResult::Failure {
error, context: _, ..
}) => {
error!("Job {} failed permanently: {}", job_id, error);
self.fail_job_permanently(&mut job, error, None).await
}
Err(e) => {
error!("Job {} execution error: {}", job_id, e);
self.handle_execution_error(&mut job, e).await
}
}
}
async fn complete_job_successfully(
&self,
job: &mut Job,
result: Option<String>,
duration_ms: u64,
metadata: std::collections::HashMap<String, String>,
) -> Result<()> {
if job.state.is_final() {
debug!(
"Job {} is already in a final state, skipping success",
job.id
);
return Ok(());
}
let succeeded_state = JobState::succeeded(duration_ms, result);
if let Err(e) = self.apply_state_change(job, succeeded_state) {
error!("Failed to set job state to Succeeded: {}", e);
return Err(e);
}
job.expires_at = Some(Utc::now() + self.succeeded_ttl);
for (key, value) in metadata {
job.add_metadata(format!("exec_{}", key), value);
}
self.storage.update(job).await?;
Ok(())
}
async fn handle_job_retry(
&self,
job: &mut Job,
error: String,
retry_at: Option<chrono::DateTime<Utc>>,
) -> Result<()> {
if job.state.is_final() {
debug!("Job {} is already in a final state, skipping retry", job.id);
return Ok(());
}
if !self.should_retry_attempt(job, None) {
debug!(
"Retry limit exceeded for job {}, failing permanently",
job.id
);
return self.fail_job_permanently(job, error, None).await;
}
let pre_retry_state = job.state.clone();
let failed_state = JobState::failed(error.clone(), None);
if let Err(e) = job.set_state(failed_state) {
error!("Failed to set job state to Failed: {}", e);
return Err(e);
}
let retry_time = retry_at
.or_else(|| self.retry_policy.calculate_retry_time(job.attempt))
.unwrap_or_else(|| Utc::now() + chrono::Duration::seconds(60));
let retry_state = JobState::awaiting_retry(retry_time, &error);
if let Err(e) = job.set_state(retry_state) {
error!("Failed to set job state to AwaitingRetry: {}", e);
return Err(e);
}
self.fire_state_change_hook(job, &pre_retry_state);
self.storage.update(job).await?;
info!(
"Job {} scheduled for retry (attempt #{}) at {}",
job.id, job.attempt, retry_time
);
Ok(())
}
async fn fail_job_permanently(
&self,
job: &mut Job,
error: String,
stack_trace: Option<String>,
) -> Result<()> {
if job.state.is_final() {
debug!(
"Job {} is already in a final state, skipping failure",
job.id
);
return Ok(());
}
let failed_state = JobState::failed(error, stack_trace);
if let Err(e) = self.apply_state_change(job, failed_state) {
error!("Failed to set job state to Failed: {}", e);
return Err(e);
}
job.expires_at = Some(Utc::now() + self.failed_ttl);
self.storage.update(job).await?;
error!(
"Job {} failed permanently after {} attempts",
job.id, job.attempt
);
Ok(())
}
async fn handle_job_timeout(&self, job: &mut Job) -> Result<()> {
let timeout_error = format!("Job timed out after {:?}", self.worker_config.job_timeout);
if self.should_retry_attempt(job, Some("TimeoutError")) {
self.handle_job_retry(job, timeout_error, None).await
} else {
self.fail_job_permanently(job, timeout_error, None).await
}
}
async fn handle_execution_error(&self, job: &mut Job, error: QmlError) -> Result<()> {
let error_type = match &error {
QmlError::StorageError { .. } => "StorageError",
QmlError::WorkerError { .. } => "WorkerError",
QmlError::TimeoutError { .. } => "TimeoutError",
_ => "UnknownError",
};
let error_message = error.to_string();
if self.should_retry_attempt(job, Some(error_type)) {
self.handle_job_retry(job, error_message, None).await
} else {
self.fail_job_permanently(job, error_message, None).await
}
}
fn should_retry_attempt(&self, job: &Job, exception_type: Option<&str>) -> bool {
if job.max_retries > 0 && job.attempt > job.max_retries {
return false;
}
self.retry_policy.should_retry(exception_type, job.attempt)
}
fn extract_previous_exception(&self, job: &Job) -> Option<String> {
match &job.state {
JobState::AwaitingRetry { last_exception, .. } => Some(last_exception.clone()),
JobState::Failed { exception, .. } => Some(exception.clone()),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::processing::{RetryStrategy, Worker};
use crate::storage::{MemoryStorage, MonitoringApi};
use async_trait::async_trait;
use chrono::Duration;
use std::sync::Arc;
struct TestWorker {
method: String,
should_succeed: bool,
should_retry: bool,
}
impl TestWorker {
fn new(method: &str, should_succeed: bool, should_retry: bool) -> Self {
Self {
method: method.to_string(),
should_succeed,
should_retry,
}
}
}
#[async_trait]
impl Worker for TestWorker {
async fn execute(&self, _job: &Job, _context: &WorkerContext) -> Result<WorkerResult> {
if self.should_succeed {
Ok(WorkerResult::success(Some("Test result".to_string()), 100))
} else if self.should_retry {
Ok(WorkerResult::retry("Test error".to_string(), None))
} else {
Ok(WorkerResult::failure("Permanent failure".to_string()))
}
}
fn method_name(&self) -> &str {
&self.method
}
}
#[tokio::test]
async fn test_successful_job_processing() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("test_method", true, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let updated_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(updated_job.state, JobState::Succeeded { .. }));
}
#[tokio::test]
async fn test_job_retry() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("test_method", false, true));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let retry_policy = RetryPolicy::new(RetryStrategy::fixed(chrono::Duration::seconds(1), 2));
let processor =
JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let updated_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(updated_job.state, JobState::AwaitingRetry { .. }));
}
#[tokio::test]
async fn test_job_permanent_failure() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("test_method", false, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let job = Job::new("test_method", serde_json::json!(["arg1".to_string()]));
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let updated_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(updated_job.state, JobState::Failed { .. }));
}
#[tokio::test]
async fn test_job_respects_retry_limit() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("limited_retry_method", false, true));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 1));
let processor =
JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
let job = Job::new("limited_retry_method", serde_json::Value::Null);
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job.clone()).await.unwrap();
let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
assert_eq!(retry_job.attempt, 1);
retry_job
.set_state(JobState::enqueued(&retry_job.queue))
.unwrap();
storage.update(&retry_job).await.unwrap();
processor.process_job(retry_job).await.unwrap();
let final_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(final_job.state, JobState::Failed { .. }));
assert_eq!(final_job.attempt, 2);
}
#[tokio::test]
async fn test_job_respects_job_specific_max_retries() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("job_specific_limit", false, true));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 5));
let processor =
JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
let job = Job::with_config(
"job_specific_limit",
serde_json::Value::Null,
"default",
0,
1,
);
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job.clone()).await.unwrap();
let mut retry_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(retry_job.state, JobState::AwaitingRetry { .. }));
assert_eq!(retry_job.attempt, 1);
retry_job
.set_state(JobState::enqueued(&retry_job.queue))
.unwrap();
storage.update(&retry_job).await.unwrap();
processor.process_job(retry_job).await.unwrap();
let final_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(final_job.state, JobState::Failed { .. }));
assert_eq!(final_job.attempt, 2);
}
#[tokio::test]
async fn failed_to_enqueued_to_failed_increments_attempt() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("manual_retry_method", false, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("test-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let job = Job::new("manual_retry_method", serde_json::Value::Null);
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let after_first = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(after_first.state, JobState::Failed { .. }));
assert_eq!(after_first.attempt, 1);
let mut manual = after_first;
manual.set_state(JobState::enqueued(&manual.queue)).unwrap();
storage.update(&manual).await.unwrap();
processor.process_job(manual).await.unwrap();
let after_second = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(after_second.state, JobState::Failed { .. }));
assert_eq!(after_second.attempt, 2);
}
use crate::core::JobStateKind;
use std::sync::Mutex;
fn install_recording_hook(
processor: JobProcessor,
) -> (JobProcessor, Arc<Mutex<Vec<(JobStateKind, JobStateKind)>>>) {
let transitions = Arc::new(Mutex::new(Vec::<(JobStateKind, JobStateKind)>::new()));
let captured = transitions.clone();
let hook: StateChangeHook = Arc::new(move |_job, prev, new| {
captured.lock().unwrap().push((prev.kind(), new.kind()));
});
(processor.with_state_change_hook(hook), transitions)
}
#[tokio::test]
async fn state_change_hook_fires_for_successful_path() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("hook_success", true, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("hook-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let (processor, transitions) = install_recording_hook(processor);
let job = Job::new("hook_success", serde_json::Value::Null);
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let transitions = transitions.lock().unwrap();
assert_eq!(
*transitions,
vec![
(JobStateKind::Enqueued, JobStateKind::Processing),
(JobStateKind::Processing, JobStateKind::Succeeded),
]
);
}
#[tokio::test]
async fn state_change_hook_skips_intermediate_failed_in_retry_path() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("hook_retry", false, true));
let registry = Arc::new(registry);
let config = WorkerConfig::new("hook-worker");
let retry_policy = RetryPolicy::new(RetryStrategy::fixed(Duration::seconds(1), 3));
let processor =
JobProcessor::with_retry_policy(registry, storage.clone(), config, retry_policy);
let (processor, transitions) = install_recording_hook(processor);
let job = Job::new("hook_retry", serde_json::Value::Null);
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let transitions = transitions.lock().unwrap();
assert_eq!(
*transitions,
vec![
(JobStateKind::Enqueued, JobStateKind::Processing),
(JobStateKind::Processing, JobStateKind::AwaitingRetry),
]
);
}
#[tokio::test]
async fn state_change_hook_fires_for_permanent_failure() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("hook_fail", false, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("hook-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let (processor, transitions) = install_recording_hook(processor);
let job = Job::new("hook_fail", serde_json::Value::Null);
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let transitions = transitions.lock().unwrap();
assert_eq!(
*transitions,
vec![
(JobStateKind::Enqueued, JobStateKind::Processing),
(JobStateKind::Processing, JobStateKind::Failed),
]
);
}
#[tokio::test]
async fn state_change_hook_is_opt_in_default_is_no_op() {
let storage = Arc::new(MemoryStorage::new());
let mut registry = WorkerRegistry::new();
registry.register(TestWorker::new("no_hook", true, false));
let registry = Arc::new(registry);
let config = WorkerConfig::new("hook-worker");
let processor = JobProcessor::new(registry, storage.clone(), config);
let job = Job::new("no_hook", serde_json::Value::Null);
let job_id = job.id.clone();
storage.enqueue(&job).await.unwrap();
processor.process_job(job).await.unwrap();
let final_job = storage.get(&job_id).await.unwrap().unwrap();
assert!(matches!(final_job.state, JobState::Succeeded { .. }));
}
}